yunqingmoswu commented on code in PR #6750:
URL: https://github.com/apache/inlong/pull/6750#discussion_r1053020028
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -164,6 +170,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData
consumedRow, @Nullable L
final byte[] valueSerialized =
serializeWithDirtyHandle(consumedRow,
DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
if (valueSerialized != null) {
+ if (metricData != null) {
Review Comment:
Why report dirty data metric here?
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java:
##########
@@ -875,7 +902,12 @@ public void onCompletion(RecordMetadata metadata,
Exception e) {
public void onCompletion(RecordMetadata metadata,
Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
- sendDirtyMetrics(rowSize, dataSize);
+ } else if (metadata != null) {
Review Comment:
What should do if the metadata is null?
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -214,6 +217,7 @@ public KafkaDynamicSink(
this.topicPattern = topicPattern;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.migrateAll = migrateAll;
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -199,6 +211,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData
consumedRow, @Nullable L
} else {
valueSerialized = serializeWithDirtyHandle(valueRow,
DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
mayDirtyData = mayDirtyData || valueSerialized == null;
+ if (metricData != null) {
Review Comment:
Why report dirty data metric here?
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -525,7 +527,8 @@ protected KafkaDynamicSink createKafkaTableSink(
@Nullable String sinkMultipleFormat,
@Nullable String topicPattern,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean migrateAll) {
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java:
##########
@@ -859,10 +883,13 @@ public void open(Configuration configuration) throws
Exception {
@Override
public void onCompletion(RecordMetadata metadata,
Exception e) {
if (e != null) {
- sendDirtyMetrics(rowSize, dataSize);
- LOG.error(
- "Error while sending record to Kafka:
" + e.getMessage(),
- e);
+ LOG.error("Error while sending record to
Kafka: " + e.getMessage(), e);
+ } else if (metadata != null) {
Review Comment:
What should do if the metadata is null?
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -179,6 +188,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData
consumedRow, @Nullable L
} else {
final RowData keyRow = createProjectedRow(consumedRow,
RowKind.INSERT, keyFieldGetters);
keySerialized = serializeWithDirtyHandle(keyRow,
DirtyType.KEY_SERIALIZE_ERROR, keySerialization);
+ if (metricData != null) {
Review Comment:
Why report dirty data metric here?
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -181,7 +183,8 @@ public KafkaDynamicSink(
@Nullable String sinkMultipleFormat,
@Nullable String topicPattern,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<Object> dirtySink) {
+ @Nullable DirtySink<Object> dirtySink,
+ boolean migrateAll) {
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -440,7 +441,8 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
sinkMultipleFormat,
tableOptions.getOptional(TOPIC_PATTERN).orElse(null),
dirtyOptions,
- dirtySink);
+ dirtySink,
+ migrateAll);
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -319,7 +323,8 @@ public DynamicTableSink copy() {
sinkMultipleFormat,
topicPattern,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ migrateAll);
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java:
##########
@@ -294,7 +296,8 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
null,
null,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ migrateAll);
Review Comment:
migrateall -> multipleSink
##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -547,6 +550,7 @@ protected KafkaDynamicSink createKafkaTableSink(
sinkMultipleFormat,
topicPattern,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ migrateAll);
Review Comment:
migrateall -> multipleSink
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]