Oliverwqcwrw commented on code in PR #318:
URL: https://github.com/apache/rocketmq-connect/pull/318#discussion_r979368037
##########
connectors/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java:
##########
@@ -63,52 +52,28 @@ public CassandraSinkTask() {
}
@Override
- public void put(Collection<SinkDataEntry> sinkDataEntries) {
+ public void put(List<ConnectRecord> connectRecords) {
try {
if (tableQueue.size() > 1) {
updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
} else {
updater = tableQueue.peek();
}
log.info("Cassandra Sink Task trying to put()");
- for (SinkDataEntry record : sinkDataEntries) {
- Map<Field, Object[]> fieldMap = new HashMap<>();
- Object[] payloads = record.getPayload();
- Schema schema = record.getSchema();
- EntryType entryType = record.getEntryType();
- String cfName = schema.getName();
- String keyspaceName = schema.getDataSource();
- List<Field> fields = schema.getFields();
- Boolean parseError = false;
- if (!fields.isEmpty()) {
- for (Field field : fields) {
- Object fieldValue = payloads[field.getIndex()];
- Object[] value =
JSONObject.parseArray((String)fieldValue).toArray();
- if (value.length == 2) {
- fieldMap.put(field, value);
- } else {
- log.error("parseArray error, fieldValue:{}",
fieldValue);
- parseError = true;
- }
- }
- }
- if (!parseError) {
- log.info("Cassandra Sink Task trying to call
updater.push()");
- Boolean isSuccess = updater.push(keyspaceName, cfName,
fieldMap, entryType);
- if (!isSuccess) {
- log.error("push data error, keyspaceName:{},
cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap,
entryType);
- }
+ for (ConnectRecord record : connectRecords) {
+ final String dbName =
record.getExtension(ConstDefine.DATABASE_NAME);
+ final String table = record.getExtension(ConstDefine.TABLE);
+ log.info("Cassandra Sink Task trying to call updater.push()");
+ Boolean isSuccess = updater.pushData(dbName, table,
record.getData());
Review Comment:
In
`org.apache.rocketmq.connect.cassandra.sink.Updater#updateRow(java.lang.String,
java.lang.String, java.lang.Object)`
>
final Struct struct = JSON.parseObject(object.toString(),
Struct.class);
final JSONObject jsonObject = JSON.parseObject(object.toString());
final JSONArray values = jsonObject.getJSONArray("values");
int count = 0;
InsertInto insert = QueryBuilder.insertInto(dbName, tableName);
RegularInsert regularInsert = null;
final List<Field> fields = struct.getSchema().getFields();
for (int i = 0; i < fields.size(); i++) {
count++;
final String name = fields.get(i).getName();
Object value = values.get(i);
if (count == 1) {
regularInsert = insert.value(name,
QueryBuilder.literal(value));
} else {
regularInsert = regularInsert.value(name,
QueryBuilder.literal(value));
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]