odbozhou commented on code in PR #318:
URL: https://github.com/apache/rocketmq-connect/pull/318#discussion_r977237070


##########
connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java:
##########
@@ -179,62 +183,35 @@ public void buildRoute() {
             }
         } catch (Exception e) {
             log.error("Fetch topic list error.", e);
-        } finally {
-            // srcMQAdminExt.shutdown();
         }
     }
 
 
+
     /**
      * We need to reason why we don't call srcMQAdminExt.shutdown() here, and 
why
      * it can be applied to srcMQAdminExt
      */
     @Override
     public void stop() {
         listenerHandle.cancel(true);
-        // srcMQAdminExt.shutdown();
     }
 
-    @Override
-    public void pause() {
 
-    }
-
-    @Override
-    public void resume() {
-
-    }
 
     @Override
     public Class<? extends Task> taskClass() {
         return CassandraSinkTask.class;
     }
 
     @Override
-    public List<KeyValue> taskConfigs() {
+    public List<KeyValue> taskConfigs(int maxTasks) {
         log.info("List.start");
         if (!configValid) {
             return new ArrayList<KeyValue>();
         }
-
-        startMQAdminTools();
-
-        buildRoute();
-
-        TaskDivideConfig tdc = new TaskDivideConfig(
-            this.dbConnectorConfig.getDbUrl(),
-            this.dbConnectorConfig.getDbPort(),
-            this.dbConnectorConfig.getDbUserName(),
-            this.dbConnectorConfig.getDbPassword(),
-            this.dbConnectorConfig.getLocalDataCenter(),
-            this.dbConnectorConfig.getConverter(),
-            DataType.COMMON_MESSAGE.ordinal(),
-            this.dbConnectorConfig.getTaskParallelism(),
-            this.dbConnectorConfig.getMode()
-        );
-
-        ((SinkDbConnectorConfig) 
this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
-
-        return 
this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, 
tdc);
+        List<KeyValue> list = new ArrayList<>();
+        list.add(this.keyValue);

Review Comment:
   The number of taskconfigs should be created according to the number of 
tasks, so as to support multiple tasks



##########
connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java:
##########
@@ -63,52 +52,28 @@ public CassandraSinkTask() {
     }
 
     @Override
-    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+    public void put(List<ConnectRecord> connectRecords) {
         try {
             if (tableQueue.size() > 1) {
                 updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
             } else {
                 updater = tableQueue.peek();
             }
             log.info("Cassandra Sink Task trying to put()");
-            for (SinkDataEntry record : sinkDataEntries) {
-                Map<Field, Object[]> fieldMap = new HashMap<>();
-                Object[] payloads = record.getPayload();
-                Schema schema = record.getSchema();
-                EntryType entryType = record.getEntryType();
-                String cfName = schema.getName();
-                String keyspaceName = schema.getDataSource();
-                List<Field> fields = schema.getFields();
-                Boolean parseError = false;
-                if (!fields.isEmpty()) {
-                    for (Field field : fields) {
-                        Object fieldValue = payloads[field.getIndex()];
-                        Object[] value = 
JSONObject.parseArray((String)fieldValue).toArray();
-                        if (value.length == 2) {
-                            fieldMap.put(field, value);
-                        } else {
-                            log.error("parseArray error, fieldValue:{}", 
fieldValue);
-                            parseError = true;
-                        }
-                    }
-                }
-                if (!parseError) {
-                    log.info("Cassandra Sink Task trying to call 
updater.push()");
-                    Boolean isSuccess = updater.push(keyspaceName, cfName, 
fieldMap, entryType);
-                    if (!isSuccess) {
-                        log.error("push data error, keyspaceName:{}, 
cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap, 
entryType);
-                    }
+            for (ConnectRecord record : connectRecords) {
+                final String dbName = 
record.getExtension(ConstDefine.DATABASE_NAME);
+                final String table = record.getExtension(ConstDefine.TABLE);
+                log.info("Cassandra Sink Task trying to call updater.push()");
+                Boolean isSuccess = updater.pushData(dbName, table, 
record.getData());

Review Comment:
   Is there a lack of schema processing? According to
   Convert Connector's schema to cassandra's schema



##########
connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java:
##########
@@ -22,102 +21,90 @@
 import com.alibaba.fastjson.JSONObject;
 import com.datastax.oss.driver.api.core.CqlSession;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
-
 import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
 import org.apache.rocketmq.connect.cassandra.common.DBUtils;
 import org.apache.rocketmq.connect.cassandra.config.Config;
 import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
 import org.apache.rocketmq.connect.cassandra.schema.Table;
-import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
 import org.apache.rocketmq.connect.cassandra.source.Querier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CassandraSourceTask extends SourceTask {
 
-    private static final Logger log = 
LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+    private static final Logger log = 
LoggerFactory.getLogger(CassandraSourceTask.class);
 
     private Config config;
 
-    private DataSource dataSource;
-
     private CqlSession cqlSession;
 
     BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
-    static final String INCREMENTING_FIELD = "incrementing";
-    static final String TIMESTAMP_FIELD = "timestamp";
     private Querier querier;
 
     public CassandraSourceTask() {
         this.config = new Config();
     }
 
     @Override
-    public Collection<SourceDataEntry> poll() {
-        List<SourceDataEntry> res = new ArrayList<>();
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
         try {
             if (tableQueue.size() > 1)
                 querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
             else
                 querier = tableQueue.peek();
-            Timer timer = new Timer();
             try {
-                Thread.currentThread();
-                Thread.sleep(1000);//毫秒
+                TimeUnit.MILLISECONDS.sleep(1000);
             } catch (Exception e) {
                 throw e;
             }
             querier.poll();
+
             for (Table dataRow : querier.getList()) {
-                JSONObject jsonObject = new JSONObject();
-                jsonObject.put("nextQuery", "database");
-                jsonObject.put("nextPosition", "table");
-                Schema schema = new Schema();
-                schema.setDataSource(dataRow.getDatabase());
-                schema.setName(dataRow.getName());
-                schema.setFields(new ArrayList<>());
-                for (int i = 0; i < dataRow.getColList().size(); i++) {
-                    String columnName = dataRow.getColList().get(i);
-                    String rawDataType = dataRow.getRawDataTypeList().get(i);
-                    Field field = new Field(i, columnName, 
ColumnParser.mapConnectorFieldType(rawDataType));
-                    schema.getFields().add(field);
-                }
-                DataEntryBuilder dataEntryBuilder = new 
DataEntryBuilder(schema);
-                
dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
-                        .entryType(EntryType.UPDATE);
-                for (int i = 0; i < dataRow.getColList().size(); i++) {
-                    Object[] value = new Object[2];
-                    value[0] = value[1] = 
dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
-                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), 
JSONObject.toJSONString(value));
+                final RecordOffset recordOffset = 
this.sourceTaskContext.offsetStorageReader().readOffset(buildRecordPartition(dataRow));
+                Map<String, String> offsetMap = new HashMap<>();
+                if (recordOffset != null && recordOffset.getOffset().size() > 
0) {
+                    offsetMap = (Map<String, String>) recordOffset.getOffset();
+                    final Object record = 
offsetMap.get(dataRow.getDataList().get(0).toString());

Review Comment:
   There seems to be a problem with the usage of 
this.sourceTaskContext.offsetStorageReader().readOffset. The offset obtained 
from this method should be used to pull data from where. The offset in 
ConnectRecord should come from the offset carried by the data



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to