Carl-Zhou-CN commented on code in PR #9446:
URL: https://github.com/apache/seatunnel/pull/9446#discussion_r2191379253


##########
seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java:
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.clickhouse.client.ClickHouseException;
+import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ClickhouseValueReader implements Serializable {
+    private static final long serialVersionUID = 4588012013447713463L;
+
+    private final ClickhouseSourceSplit clickhouseSourceSplit;
+    private final SeaTunnelRowType rowTypeInfo;
+    private final ClickhouseSourceTable clickhouseSourceTable;
+    private StreamValueReader streamValueReader;
+    private ClickhouseProxy proxy;
+
+    protected int currentPartIndex = 0;
+
+    private List<SeaTunnelRow> rowBatch;
+
+    public ClickhouseValueReader(
+            ClickhouseSourceSplit clickhouseSourceSplit,
+            SeaTunnelRowType seaTunnelRowType,
+            ClickhouseSourceTable clickhouseSourceTable) {
+        this.clickhouseSourceSplit = clickhouseSourceSplit;
+        this.rowTypeInfo = seaTunnelRowType;
+        this.clickhouseSourceTable = clickhouseSourceTable;
+        this.proxy = new 
ClickhouseProxy(clickhouseSourceSplit.getShard().getNode());
+    }
+
+    public boolean hasNext() {
+        if (shouldUseStreamReader()) {
+            if (streamValueReader == null) {
+                streamValueReader = new StreamValueReader();
+            }
+            return streamValueReader.hasNext();
+        } else if (clickhouseSourceTable.isSqlStrategyRead()) {
+            return sqlBatchStrategyRead();
+        } else {
+            return partBatchStrategyRead();
+        }
+    }
+
+    public List<SeaTunnelRow> next() {
+        if (rowBatch == null) {
+            throw new ClickhouseConnectorException(
+                    ClickhouseConnectorErrorCode.SHOULD_NEVER_HAPPEN, "never 
happen error !");
+        }
+
+        return rowBatch;
+    }
+
+    private boolean partBatchStrategyRead() {
+        List<ClickhousePart> parts = clickhouseSourceSplit.getParts();
+        int partSize = parts.size();
+
+        if (currentPartIndex >= partSize) {
+            return false;
+        }
+
+        ClickhousePart currentPart = parts.get(currentPartIndex);
+
+        // If current part has been processed, move to the next part
+        if (currentPart.isEos()) {
+            currentPartIndex++;
+            return currentPartIndex < partSize && partBatchStrategyRead();
+        }
+
+        try {
+            String query = buildPartQuery(currentPart);
+            rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+
+            log.debug(
+                    "SplitId: {}, partName: {} read rowBatch size: {}",
+                    clickhouseSourceSplit.getSplitId(),
+                    currentPart.getName(),
+                    rowBatch.size());
+
+            if (rowBatch.isEmpty()) {
+                currentPart.setEos(true);
+                currentPartIndex++;
+                return currentPartIndex < partSize && partBatchStrategyRead();
+            }
+
+            // update part offset
+            currentPart.setOffset(currentPart.getOffset() + rowBatch.size());
+            return true;
+        } catch (Exception e) {
+            throw new ClickhouseConnectorException(
+                    ClickhouseConnectorErrorCode.QUERY_DATA_ERROR,
+                    String.format(
+                            "Failed to read data from part %s, shard: %s, 
splitId: %s, message: %s",
+                            currentPart.getName(),
+                            currentPart.getShard().getNode(),
+                            clickhouseSourceSplit.getSplitId(),
+                            e.getMessage()),
+                    e);
+        }
+    }
+
+    private boolean sqlBatchStrategyRead() {
+        String query = buildSqlQuery();
+
+        try {
+            rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+
+            clickhouseSourceSplit.setSqlOffset(
+                    clickhouseSourceSplit.getSqlOffset() + rowBatch.size());
+
+            return !rowBatch.isEmpty();
+        } catch (Exception e) {
+            throw new ClickhouseConnectorException(
+                    ClickhouseConnectorErrorCode.QUERY_DATA_ERROR,
+                    String.format(
+                            "Failed to read data from sql %s, shard: %s, 
splitId %s, message: %s",
+                            query,
+                            clickhouseSourceSplit.getShard().getNode(),
+                            clickhouseSourceSplit.getSplitId(),
+                            e.getMessage()),
+                    e);
+        }
+    }
+
+    public void close() {
+        if (proxy != null) {
+            proxy.close();
+        }
+        if (streamValueReader != null) {
+            streamValueReader.close();
+        }
+    }
+
+    private boolean shouldUseStreamReader() {
+        return clickhouseSourceTable.isComplexSql()
+                || 
StringUtils.isEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey());
+    }
+
+    private String buildPartQuery(ClickhousePart part) {
+        TablePath tablePath = TablePath.of(part.getDatabase(), 
part.getTable());
+
+        String whereClause = String.format("_part = '%s'", part.getName());
+        if (StringUtils.isNotEmpty(clickhouseSourceTable.getFilterQuery())) {
+            whereClause += " AND (" + clickhouseSourceTable.getFilterQuery() + 
")";
+        }
+
+        String orderByClause = "";
+        if 
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
 {
+            orderByClause =
+                    " ORDER BY " + 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+        }
+
+        String sql;
+        if (StringUtils.isNotEmpty(orderByClause)) {
+            sql =
+                    String.format(
+                            "SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH 
TIES",
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName(),
+                            whereClause,
+                            orderByClause,
+                            part.getOffset(),
+                            clickhouseSourceTable.getBatchSize());
+        } else {
+            sql =
+                    String.format(
+                            "SELECT * FROM %s.%s WHERE %s",
+                            tablePath.getDatabaseName(), 
tablePath.getTableName(), whereClause);
+        }
+
+        return sql;
+    }
+
+    private String buildSqlQuery() {
+        String orderByClause = "";
+        if 
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
 {
+            orderByClause =
+                    " ORDER BY " + 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+        }
+
+        String executeSql;
+        if (StringUtils.isNotEmpty(orderByClause)) {
+            executeSql =
+                    String.format(
+                            "SELECT * FROM (%s) AS t %s LIMIT %d, %d WITH 
TIES",
+                            clickhouseSourceSplit.getSplitQuery(),
+                            orderByClause,
+                            clickhouseSourceSplit.getSqlOffset(),
+                            clickhouseSourceTable.getBatchSize());
+        } else {
+            executeSql =
+                    String.format("SELECT * FROM (%s) AS t", 
clickhouseSourceSplit.getSplitQuery());
+        }
+
+        return executeSql;
+    }
+
+    private class StreamValueReader implements Serializable {
+        private static final long serialVersionUID = -7037116446966849773L;
+
+        private final BlockingQueue<SeaTunnelRow> rowQueue;
+        private AtomicBoolean eos = new AtomicBoolean(false);
+        private final List<String> sqlList;
+
+        public StreamValueReader() {
+            this.rowQueue = new 
LinkedBlockingDeque<>(clickhouseSourceTable.getBatchSize());
+            this.sqlList = buildSqlList();
+            asyncReadThread.start();
+
+            log.info("StreamValueReader start.");
+        }
+
+        private final Thread asyncReadThread =

Review Comment:
   It is suggested to add the name of a thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to