Oliverwqcwrw commented on code in PR #318:
URL: https://github.com/apache/rocketmq-connect/pull/318#discussion_r982032160


##########
connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java:
##########
@@ -22,102 +21,82 @@
 import com.alibaba.fastjson.JSONObject;
 import com.datastax.oss.driver.api.core.CqlSession;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
-
 import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
 import org.apache.rocketmq.connect.cassandra.common.DBUtils;
 import org.apache.rocketmq.connect.cassandra.config.Config;
 import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
 import org.apache.rocketmq.connect.cassandra.schema.Table;
-import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
 import org.apache.rocketmq.connect.cassandra.source.Querier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class CassandraSourceTask extends SourceTask {
 
-    private static final Logger log = 
LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+    private static final Logger log = 
LoggerFactory.getLogger(CassandraSourceTask.class);
 
     private Config config;
 
-    private DataSource dataSource;
-
     private CqlSession cqlSession;
 
     BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
-    static final String INCREMENTING_FIELD = "incrementing";
-    static final String TIMESTAMP_FIELD = "timestamp";
     private Querier querier;
 
     public CassandraSourceTask() {
         this.config = new Config();
     }
 
     @Override
-    public Collection<SourceDataEntry> poll() {
-        List<SourceDataEntry> res = new ArrayList<>();
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
         try {
             if (tableQueue.size() > 1)
                 querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
             else
                 querier = tableQueue.peek();
-            Timer timer = new Timer();
             try {
-                Thread.currentThread();
-                Thread.sleep(1000);//毫秒
+                TimeUnit.MILLISECONDS.sleep(1000);
             } catch (Exception e) {
                 throw e;
             }
             querier.poll();
+
             for (Table dataRow : querier.getList()) {
-                JSONObject jsonObject = new JSONObject();
-                jsonObject.put("nextQuery", "database");
-                jsonObject.put("nextPosition", "table");
-                Schema schema = new Schema();
-                schema.setDataSource(dataRow.getDatabase());
-                schema.setName(dataRow.getName());
-                schema.setFields(new ArrayList<>());
-                for (int i = 0; i < dataRow.getColList().size(); i++) {
-                    String columnName = dataRow.getColList().get(i);
-                    String rawDataType = dataRow.getRawDataTypeList().get(i);
-                    Field field = new Field(i, columnName, 
ColumnParser.mapConnectorFieldType(rawDataType));
-                    schema.getFields().add(field);
-                }
-                DataEntryBuilder dataEntryBuilder = new 
DataEntryBuilder(schema);
-                
dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
-                        .entryType(EntryType.UPDATE);
-                for (int i = 0; i < dataRow.getColList().size(); i++) {
-                    Object[] value = new Object[2];
-                    value[0] = value[1] = 
dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
-                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), 
JSONObject.toJSONString(value));
-                }
-
-                SourceDataEntry sourceDataEntry = 
dataEntryBuilder.buildSourceDataEntry(
-                        ByteBuffer.wrap((ConstDefine.PREFIX + 
config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
-                        
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
-                res.add(sourceDataEntry);
-                log.debug("sourceDataEntry : {}", 
JSONObject.toJSONString(sourceDataEntry));
+
+                Schema schema = 
SchemaBuilder.struct().name(dataRow.getDatabase()).build();

Review Comment:
   done
   Yes, it should be table name
   Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to