e-mhui commented on code in PR #7921:
URL: https://github.com/apache/inlong/pull/7921#discussion_r1176483196
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java:
##########
@@ -71,14 +70,14 @@ public class JdbcSourceEventDispatcher extends
EventDispatcher<TableId> {
private static final DocumentWriter DOCUMENT_WRITER =
DocumentWriter.defaultWriter();
- public final ChangeEventQueue<DataChangeEvent> queue;
- public final HistorizedDatabaseSchema historizedSchema;
- public final DataCollectionFilters.DataCollectionFilter<TableId> filter;
- public final CommonConnectorConfig connectorConfig;
- public final TopicSelector<TableId> topicSelector;
- public final Schema schemaChangeKeySchema;
- public final Schema schemaChangeValueSchema;
- public final String topic;
+ private final ChangeEventQueue<DataChangeEvent> queue;
+ private final HistorizedDatabaseSchema historizedSchema;
+ private final DataCollectionFilters.DataCollectionFilter<TableId> filter;
+ private final CommonConnectorConfig connectorConfig;
Review Comment:
These changes will affect this PR
https://github.com/apache/inlong/pull/7709, so you should not modify it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/AppendMetadataCollector.java:
##########
@@ -46,14 +46,14 @@ public AppendMetadataCollector(MetadataConverter[]
metadataConverters, boolean m
}
public void collect(RowData physicalRow, TableChange tableSchema) {
- GenericRowData metaRow = new GenericRowData(metadataConverters.length);
+ GenericRowData metaRow = new GenericRowData(physicalRow.getRowKind(),
metadataConverters.length);
Review Comment:
Why add `rowkind` ?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/relational/JdbcSourceEventDispatcher.java:
##########
@@ -191,16 +186,14 @@ private Struct schemaChangeRecordKey(SchemaChangeEvent
event) {
}
private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws
IOException {
+ Struct sourceInfo = event.getSource();
Map<String, Object> source = new HashMap<>();
- if (isMysqlConnector(event.getSource())) {
- Struct sourceInfo = event.getSource();
- String fileName =
sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
- Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
- Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
- source.put(SERVER_ID_KEY, serverId);
- source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
- source.put(BINLOG_POSITION_OFFSET_KEY, pos);
- }
+ String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY);
+ Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY);
+ Long serverId = sourceInfo.getInt64(SERVER_ID_KEY);
+ source.put(SERVER_ID_KEY, serverId);
+ source.put(BINLOG_FILENAME_OFFSET_KEY, fileName);
+ source.put(BINLOG_POSITION_OFFSET_KEY, pos);
Review Comment:
These changes will affect this PR
https://github.com/apache/inlong/pull/7709, so you should not modify it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -115,15 +115,23 @@ public Optional<SourceSplitBase> getNext() {
// stream split assigning
if (isStreamSplitAssigned) {
// no more splits for the assigner
+ LOG.trace(
+ "No more splits for the SnapshotSplitAssigner.
StreamSplit is already assigned.");
return Optional.empty();
} else if (snapshotSplitAssigner.isFinished()) {
// we need to wait snapshot-assigner to be finished before
// assigning the stream split. Otherwise, records emitted from
stream split
// might be out-of-order in terms of same primary key with
snapshot splits.
isStreamSplitAssigned = true;
- return Optional.of(createStreamSplit());
+ StreamSplit streamSplit = createStreamSplit();
+ LOG.trace(
+ "SnapshotSplitAssigner is finished: creating a new
stream split {}",
+ streamSplit);
Review Comment:
If some logs are not important, they can be removed. Excessive logging can
impact performance.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -97,25 +100,27 @@ public void submitTask(FetchTask<SourceSplitBase>
fetchTask) {
@Override
public boolean isFinished() {
- return currentStreamSplit == null || !streamFetchTask.isRunning();
+ return currentStreamSplit == null || !currentTaskRunning;
}
@Nullable
@Override
public Iterator<SourceRecords> pollSplitRecords() throws
InterruptedException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
- if (streamFetchTask.isRunning()) {
+ if (currentTaskRunning) {
Review Comment:
Why replace `streamFetchTask.isRunning()`?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -158,7 +157,7 @@ public CommonConnectorConfig getDbzConnectorConfig() {
}
public SchemaNameAdjuster getSchemaNameAdjuster() {
- return null;
+ return SchemaNameAdjuster.create();
Review Comment:
Why not return null ?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/SerializerUtils.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.DebeziumException;
+import io.debezium.relational.TableId;
+import io.debezium.util.HexConverter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+public class SerializerUtils {
Review Comment:
It's possible to directly import from flink cdc-base instead of copying it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/SourceRecordUtils.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import io.debezium.data.Envelope;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.TableId;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.util.SchemaNameAdjuster;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
+
+/** Utility class to deal record. */
+public class SourceRecordUtils {
Review Comment:
It's possible to directly import from flink cdc-base instead of copying it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/EmbeddedFlinkDatabaseHistory.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.source;
+
+import io.debezium.config.Configuration;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.ddl.DdlParser;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.TableChanges;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A {@link DatabaseHistory} implementation which store the latest table
schema in Flink state.
+ *
+ * <p>It stores/recovers history using data offered by {@link
SourceSplitState}.
+ */
+public class EmbeddedFlinkDatabaseHistory implements DatabaseHistory {
+
Review Comment:
It's possible to directly import from flink cdc-base, instead of copying it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/state/PendingSplitsStateSerializer.java:
##########
@@ -367,8 +370,9 @@ private List<TableId> readTableIds(DataInputDeserializer
in) throws IOException
List<TableId> tableIds = new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
+ boolean useCatalogBeforeSchema = in.readBoolean();
String tableIdStr = in.readUTF();
- tableIds.add(TableId.parse(tableIdStr));
+ tableIds.add(TableId.parse(tableIdStr, useCatalogBeforeSchema));
Review Comment:
Please test if the task restore from savepoint is working properly.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/assigner/HybridSplitAssigner.java:
##########
@@ -115,15 +115,23 @@ public Optional<SourceSplitBase> getNext() {
// stream split assigning
if (isStreamSplitAssigned) {
// no more splits for the assigner
+ LOG.trace(
+ "No more splits for the SnapshotSplitAssigner.
StreamSplit is already assigned.");
Review Comment:
If some logs are not important, they can be removed. Excessive logging can
impact performance.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -97,25 +100,27 @@ public void submitTask(FetchTask<SourceSplitBase>
fetchTask) {
@Override
public boolean isFinished() {
- return currentStreamSplit == null || !streamFetchTask.isRunning();
+ return currentStreamSplit == null || !currentTaskRunning;
Review Comment:
Why replace `!streamFetchTask.isRunning()`?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -59,18 +59,17 @@ public JdbcSourceFetchTaskContext(
@Override
public TableId getTableId(SourceRecord record) {
- return null;
+ return SourceRecordUtils.getTableId(record);
}
@Override
public boolean isDataChangeRecord(SourceRecord record) {
- return false;
+ return SourceRecordUtils.isDataChangeRecord(record);
}
Review Comment:
Why not return null ?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -59,18 +59,17 @@ public JdbcSourceFetchTaskContext(
@Override
public TableId getTableId(SourceRecord record) {
- return null;
+ return SourceRecordUtils.getTableId(record);
Review Comment:
Why not return null ?
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/ObjectUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.util;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/** Utilities for operation on {@link Object}. */
+public class ObjectUtils {
Review Comment:
It's possible to directly import from flink cdc-base instead of copying it.
##########
inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java:
##########
@@ -164,8 +171,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
if (taskContext.isDataChangeRecord(sourceRecord)) {
TableId tableId = taskContext.getTableId(sourceRecord);
Offset position = taskContext.getStreamOffset(sourceRecord);
- // source record has no primary need no comparing for binlog
position
- if (hasEnterPureStreamPhase(tableId, position) ||
sourceRecord.key() == null) {
+ if (hasEnterPureStreamPhase(tableId, position)) {
Review Comment:
These changes will affect this PR
https://github.com/apache/inlong/pull/7489, so you should not modify it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]