This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 820eca53d [cdc] Use CdcSourceRecord as source type instead of raw
String (#2783)
820eca53d is described below
commit 820eca53d51b3cf4c4f32478b54f35528fc1681d
Author: Harvey Yue <[email protected]>
AuthorDate: Fri Apr 26 16:37:02 2024 +0800
[cdc] Use CdcSourceRecord as source type instead of raw String (#2783)
Co-authored-by: zhuangchong <[email protected]>
---
.../org/apache/paimon/utils/JsonSerdeUtil.java | 16 ++--
paimon-e2e-tests/pom.xml | 2 +-
paimon-flink/paimon-flink-cdc/pom.xml | 4 +
.../paimon/flink/action/cdc/CdcSourceRecord.java | 83 ++++++++++++++++++
.../flink/action/cdc/MessageQueueSchemaUtils.java | 2 +-
.../flink/action/cdc/SyncDatabaseActionBase.java | 2 +-
.../paimon/flink/action/cdc/SyncJobHandler.java | 4 +-
.../flink/action/cdc/SyncTableActionBase.java | 2 +-
.../action/cdc/SynchronizationActionBase.java | 10 +--
.../flink/action/cdc/format/RecordParser.java | 28 ++++---
.../cdc/format/debezium/DebeziumRecordParser.java | 5 +-
.../flink/action/cdc/kafka/KafkaActionUtils.java | 48 +++++++----
.../action/cdc/mongodb/MongoDBActionUtils.java | 12 +--
.../action/cdc/mongodb/MongoDBRecordParser.java | 11 ++-
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 3 +-
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 3 +-
.../flink/action/cdc/mysql/MySqlActionUtils.java | 11 +--
.../flink/action/cdc/mysql/MySqlRecordParser.java | 8 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 3 +-
.../action/cdc/mysql/MySqlSyncTableAction.java | 3 +-
.../action/cdc/postgres/PostgresActionUtils.java | 11 +--
.../action/cdc/postgres/PostgresRecordParser.java | 9 +-
.../cdc/postgres/PostgresSyncTableAction.java | 3 +-
.../flink/action/cdc/pulsar/PulsarActionUtils.java | 31 ++++---
.../CdcDebeziumDeserializationSchema.java | 97 ++++++++++++++++++++++
.../CdcJsonDeserializationSchema.java | 74 +++++++++++++++++
.../watermark/CdcTimestampExtractorFactory.java | 17 ++--
.../action/cdc/watermark/CdcWatermarkStrategy.java | 9 +-
.../cdc/kafka/KafkaSyncTableActionITCase.java | 6 +-
29 files changed, 407 insertions(+), 110 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index aa493dd4a..43d16c04c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -219,24 +219,22 @@ public class JsonSerdeUtil {
}
/** Parses a JSON string and extracts a value of the specified type from
the given path keys. */
- public static <T> T extractValue(String json, Class<T> valueType,
String... path)
+ public static <T> T extractValue(JsonNode jsonNode, Class<T> valueType,
String... path)
throws JsonProcessingException {
- JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
for (String key : path) {
- currentNode = currentNode.get(key);
- if (currentNode == null) {
+ jsonNode = jsonNode.get(key);
+ if (jsonNode == null) {
throw new IllegalArgumentException("Invalid path or key not
found: " + key);
}
}
- return OBJECT_MAPPER_INSTANCE.treeToValue(currentNode, valueType);
+ return OBJECT_MAPPER_INSTANCE.treeToValue(jsonNode, valueType);
}
/** Checks if a specified node exists in a JSON string. */
- public static boolean isNodeExists(String json, String... path) throws
JsonProcessingException {
- JsonNode currentNode = OBJECT_MAPPER_INSTANCE.readTree(json);
+ public static boolean isNodeExists(JsonNode jsonNode, String... path) {
for (String key : path) {
- currentNode = currentNode.get(key);
- if (currentNode == null) {
+ jsonNode = jsonNode.get(key);
+ if (jsonNode == null) {
return false;
}
}
diff --git a/paimon-e2e-tests/pom.xml b/paimon-e2e-tests/pom.xml
index fdcdbda7a..862b39361 100644
--- a/paimon-e2e-tests/pom.xml
+++ b/paimon-e2e-tests/pom.xml
@@ -33,7 +33,7 @@ under the License.
<properties>
<flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
- <flink.cdc.version>2.3.0</flink.cdc.version>
+ <flink.cdc.version>2.4.2</flink.cdc.version>
<flink.sql.connector.hive>flink-sql-connector-hive-2.3.9_${scala.binary.version}</flink.sql.connector.hive>
</properties>
diff --git a/paimon-flink/paimon-flink-cdc/pom.xml
b/paimon-flink/paimon-flink-cdc/pom.xml
index 1b18833a4..868eac71f 100644
--- a/paimon-flink/paimon-flink-cdc/pom.xml
+++ b/paimon-flink/paimon-flink-cdc/pom.xml
@@ -280,6 +280,10 @@ under the License.
<configuration>
<relocations>
<!-- Same as flink-sql-connector-kafka. -->
+ <relocation>
+ <pattern>org.apache.kafka.connect</pattern>
+
<shadedPattern>com.ververica.cdc.connectors.shaded.org.apache.kafka.connect</shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
new file mode 100644
index 000000000..51a14534c
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcSourceRecord.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A data change record from the CDC source. */
+public class CdcSourceRecord implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @Nullable private final String topic;
+
+ @Nullable private final Object key;
+
+ // TODO Use generics to support more scenarios.
+ private final Object value;
+
+ public CdcSourceRecord(@Nullable String topic, @Nullable Object key,
Object value) {
+ this.topic = topic;
+ this.key = key;
+ this.value = value;
+ }
+
+ public CdcSourceRecord(Object value) {
+ this(null, null, value);
+ }
+
+ @Nullable
+ public String getTopic() {
+ return topic;
+ }
+
+ @Nullable
+ public Object getKey() {
+ return key;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof CdcSourceRecord)) {
+ return false;
+ }
+
+ CdcSourceRecord that = (CdcSourceRecord) o;
+ return Objects.equals(topic, that.topic)
+ && Objects.equals(key, that.key)
+ && Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, key, value);
+ }
+
+ @Override
+ public String toString() {
+ return topic + ": " + key + " " + value;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
index 590f63911..73e63bcd0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
@@ -89,7 +89,7 @@ public class MessageQueueSchemaUtils {
/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {
- List<String> getRecords(int pollTimeOutMills);
+ List<CdcSourceRecord> getRecords(int pollTimeOutMills);
String topic();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index ecfca9a17..4fa6ca76c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -110,7 +110,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
}
@Override
- protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
+ protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, Collections.emptyList(), typeMapping,
metadataConverters);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index 255e1ca7e..c674e560b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -180,7 +180,7 @@ public class SyncJobHandler {
}
}
- public Source<String, ?, ?> provideSource() {
+ public Source<CdcSourceRecord, ?, ?> provideSource() {
switch (sourceType) {
case KAFKA:
return KafkaActionUtils.buildKafkaSource(cdcSourceConfig);
@@ -192,7 +192,7 @@ public class SyncJobHandler {
}
}
- public FlatMapFunction<String, RichCdcMultiplexRecord> provideRecordParser(
+ public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
provideRecordParser(
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index ff271ad59..f4e5bfe6a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -155,7 +155,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
}
@Override
- protected FlatMapFunction<String, RichCdcMultiplexRecord> recordParse() {
+ protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
recordParse() {
return syncJobHandler.provideRecordParser(
caseSensitive, computedColumns, typeMapping,
metadataConverters);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index 5c04d5707..944185347 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -132,7 +132,7 @@ public abstract class SynchronizationActionBase extends
ActionBase {
return syncJobHandler.provideSource();
}
- private DataStreamSource<String> buildDataStreamSource(Object source) {
+ private DataStreamSource<CdcSourceRecord> buildDataStreamSource(Object
source) {
if (source instanceof Source) {
boolean isAutomaticWatermarkCreationEnabled =
tableConfig.containsKey(CoreOptions.TAG_AUTOMATIC_CREATION.key())
@@ -143,7 +143,7 @@ public abstract class SynchronizationActionBase extends
ActionBase {
Options options = Options.fromMap(tableConfig);
Duration idleTimeout = options.get(SCAN_WATERMARK_IDLE_TIMEOUT);
String watermarkAlignGroup =
options.get(SCAN_WATERMARK_ALIGNMENT_GROUP);
- WatermarkStrategy<String> watermarkStrategy =
+ WatermarkStrategy<CdcSourceRecord> watermarkStrategy =
isAutomaticWatermarkCreationEnabled
? watermarkAlignGroup != null
? new
CdcWatermarkStrategy(createExtractor(source))
@@ -158,18 +158,18 @@ public abstract class SynchronizationActionBase extends
ActionBase {
watermarkStrategy =
watermarkStrategy.withIdleness(idleTimeout);
}
return env.fromSource(
- (Source<String, ?, ?>) source,
+ (Source<CdcSourceRecord, ?, ?>) source,
watermarkStrategy,
syncJobHandler.provideSourceName());
}
if (source instanceof SourceFunction) {
return env.addSource(
- (SourceFunction<String>) source,
syncJobHandler.provideSourceName());
+ (SourceFunction<CdcSourceRecord>) source,
syncJobHandler.provideSourceName());
}
throw new UnsupportedOperationException("Unrecognized source type");
}
- protected abstract FlatMapFunction<String, RichCdcMultiplexRecord>
recordParse();
+ protected abstract FlatMapFunction<CdcSourceRecord,
RichCdcMultiplexRecord> recordParse();
protected abstract EventParser.Factory<RichCdcMultiplexRecord>
buildEventParserFactory();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 4dcbb2a81..f4543759a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.format;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
@@ -26,7 +27,6 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.TypeUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -55,8 +55,10 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDupl
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
+import static org.apache.paimon.utils.JsonSerdeUtil.convertValue;
import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
+import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
/**
* Provides a base implementation for parsing messages of various formats into
{@link
@@ -66,7 +68,8 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
* Subclasses are expected to provide specific implementations for extracting
records, validating
* message formats, and other format-specific operations.
*/
-public abstract class RecordParser implements FlatMapFunction<String,
RichCdcMultiplexRecord> {
+public abstract class RecordParser
+ implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
private static final Logger LOG =
LoggerFactory.getLogger(RecordParser.class);
@@ -86,7 +89,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
}
@Nullable
- public Schema buildSchema(String record) {
+ public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
if (isDDL()) {
@@ -103,7 +106,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
builder.primaryKey(extractPrimaryKeys());
return builder.build();
} catch (Exception e) {
- logInvalidJsonString(record);
+ logInvalidSourceRecord(record);
throw e;
}
}
@@ -126,12 +129,12 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
}
@Override
- public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) {
+ public void flatMap(CdcSourceRecord value,
Collector<RichCdcMultiplexRecord> out) {
try {
setRoot(value);
extractRecords().forEach(out::collect);
} catch (Exception e) {
- logInvalidJsonString(value);
+ logInvalidSourceRecord(value);
throw e;
}
}
@@ -140,7 +143,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes)
{
paimonFieldTypes.putAll(fillDefaultTypes(record));
Map<String, Object> recordMap =
- JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
+ convertValue(record, new TypeReference<Map<String, Object>>()
{});
Map<String, String> rowData =
recordMap.entrySet().stream()
.filter(entry -> Objects.nonNull(entry.getKey()))
@@ -151,8 +154,7 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
if
(Objects.nonNull(entry.getValue())
&&
!TypeUtils.isBasicType(entry.getValue())) {
try {
- return
JsonSerdeUtil.writeValueAsString(
- entry.getValue());
+ return
writeValueAsString(entry.getValue());
} catch
(JsonProcessingException e) {
LOG.error("Failed to
deserialize record.", e);
return
Objects.toString(entry.getValue());
@@ -217,8 +219,8 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
new CdcRecord(rowKind, data));
}
- protected void setRoot(String record) {
- root = JsonSerdeUtil.fromJson(record, JsonNode.class);
+ protected void setRoot(CdcSourceRecord record) {
+ root = (JsonNode) record.getValue();
}
protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
@@ -243,8 +245,8 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
return isNull(node) ? null : node.asText();
}
- private void logInvalidJsonString(String json) {
- LOG.info("Invalid Json:\n{}", json);
+ private void logInvalidSourceRecord(CdcSourceRecord record) {
+ LOG.error("Invalid source record:\n{}", record.toString());
}
protected void checkNotNull(JsonNode node, String key) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index 96f1bc074..471856462 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.format.debezium;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
@@ -119,8 +120,8 @@ public class DebeziumRecordParser extends RecordParser {
}
@Override
- protected void setRoot(String record) {
- JsonNode node = JsonSerdeUtil.fromJson(record, JsonNode.class);
+ protected void setRoot(CdcSourceRecord record) {
+ JsonNode node = (JsonNode) record.getValue();
hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 64a543beb..4f0be0ef2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,11 +18,12 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
import org.apache.paimon.utils.StringUtils;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
@@ -34,14 +35,14 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -69,8 +70,8 @@ public class KafkaActionUtils {
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
- public static KafkaSource<String> buildKafkaSource(Configuration
kafkaConfig) {
- KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.builder();
+ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration
kafkaConfig) {
+ KafkaSourceBuilder<CdcSourceRecord> kafkaSourceBuilder =
KafkaSource.builder();
if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
List<String> topics =
@@ -83,11 +84,9 @@ public class KafkaActionUtils {
Pattern.compile(kafkaConfig.get(KafkaConnectorOptions.TOPIC_PATTERN)));
}
- KafkaValueOnlyDeserializationSchemaWrapper<String> schema =
- new KafkaValueOnlyDeserializationSchemaWrapper<>(new
SimpleStringSchema());
- kafkaSourceBuilder.setDeserializer(schema);
-
- kafkaSourceBuilder.setGroupId(kafkaPropertiesGroupId(kafkaConfig));
+ kafkaSourceBuilder
+ .setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
+ .setGroupId(kafkaPropertiesGroupId(kafkaConfig));
Properties properties = createKafkaProperties(kafkaConfig);
@@ -252,13 +251,17 @@ public class KafkaActionUtils {
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.get(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS));
props.put(ConsumerConfig.GROUP_ID_CONFIG,
kafkaPropertiesGroupId(kafkaConfig));
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
String topic;
if (kafkaConfig.contains(KafkaConnectorOptions.TOPIC)) {
@@ -310,20 +313,29 @@ public class KafkaActionUtils {
private static class KafkaConsumerWrapper implements
MessageQueueSchemaUtils.ConsumerWrapper {
- private final KafkaConsumer<String, String> consumer;
+ private final KafkaConsumer<byte[], byte[]> consumer;
private final String topic;
- KafkaConsumerWrapper(KafkaConsumer<String, String> kafkaConsumer,
String topic) {
+ KafkaConsumerWrapper(KafkaConsumer<byte[], byte[]> kafkaConsumer,
String topic) {
this.consumer = kafkaConsumer;
this.topic = topic;
}
@Override
- public List<String> getRecords(int pollTimeOutMills) {
- ConsumerRecords<String, String> consumerRecords =
+ public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
+ ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(pollTimeOutMills));
+ CdcJsonDeserializationSchema deserializationSchema = new
CdcJsonDeserializationSchema();
return
StreamSupport.stream(consumerRecords.records(topic).spliterator(), false)
- .map(ConsumerRecord::value)
+ .map(
+ consumerRecord -> {
+ try {
+ return deserializationSchema.deserialize(
+ consumerRecord.value());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
.collect(Collectors.toList());
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index c1da23b0b..65d22ba33 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,12 +18,14 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
+
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -83,9 +85,9 @@ public class MongoDBActionUtils {
.withDescription(
"Determines whether to use the default MongoDB _id
generation strategy. If set to true, the default _id generation will remove the
outer $oid nesting. If set to false, no additional processing will be done on
the _id field.");
- public static MongoDBSource<String> buildMongodbSource(
+ public static MongoDBSource<CdcSourceRecord> buildMongodbSource(
Configuration mongodbConfig, String tableList) {
- MongoDBSourceBuilder<String> sourceBuilder = MongoDBSource.builder();
+ MongoDBSourceBuilder<CdcSourceRecord> sourceBuilder =
MongoDBSource.builder();
if (mongodbConfig.contains(MongoDBSourceOptions.USERNAME)
&& mongodbConfig.contains(MongoDBSourceOptions.PASSWORD)) {
@@ -132,8 +134,8 @@ public class MongoDBActionUtils {
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(false,
customConverterConfigs);
+ CdcDebeziumDeserializationSchema schema =
+ new CdcDebeziumDeserializationSchema(false,
customConverterConfigs);
return sourceBuilder.deserializer(schema).build();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 1307c070a..9264e409f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.Mongo4VersionStrategy;
import
org.apache.paimon.flink.action.cdc.mongodb.strategy.MongoVersionStrategy;
@@ -33,7 +34,7 @@ import org.apache.flink.util.Collector;
import java.util.List;
/**
- * A parser for MongoDB Debezium JSON strings, converting them into a list of
{@link
+ * A parser for MongoDB Debezium JSON records, converting them into a list of
{@link
* RichCdcMultiplexRecord}s.
*
* <p>This parser is designed to process and transform incoming MongoDB
Debezium JSON records into a
@@ -51,7 +52,8 @@ import java.util.List;
* <p>Note: This parser is primarily intended for use in Flink streaming
applications that process
* MongoDB CDC data.
*/
-public class MongoDBRecordParser implements FlatMapFunction<String,
RichCdcMultiplexRecord> {
+public class MongoDBRecordParser
+ implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
private static final String FIELD_DATABASE = "db";
private static final String FIELD_TABLE = "coll";
@@ -72,8 +74,9 @@ public class MongoDBRecordParser implements
FlatMapFunction<String, RichCdcMulti
}
@Override
- public void flatMap(String value, Collector<RichCdcMultiplexRecord> out)
throws Exception {
- root = OBJECT_MAPPER.readValue(value, JsonNode.class);
+ public void flatMap(CdcSourceRecord value,
Collector<RichCdcMultiplexRecord> out)
+ throws Exception {
+ root = OBJECT_MAPPER.readValue((String) value.getValue(),
JsonNode.class);
String databaseName = extractString(FIELD_DATABASE);
String collection = extractString(FIELD_TABLE);
MongoVersionStrategy versionStrategy =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 3fcfccd07..7881aa58b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
@@ -58,7 +59,7 @@ public class MongoDBSyncDatabaseAction extends
SyncDatabaseActionBase {
}
@Override
- protected MongoDBSource<String> buildSource() {
+ protected MongoDBSource<CdcSourceRecord> buildSource() {
return MongoDBActionUtils.buildMongodbSource(
cdcSourceConfig,
CdcActionCommonUtils.combinedModeTableList(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index 32596863a..0c5b07886 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.schema.Schema;
@@ -69,7 +70,7 @@ public class MongoDBSyncTableAction extends
SyncTableActionBase {
}
@Override
- protected MongoDBSource<String> buildSource() {
+ protected MongoDBSource<CdcSourceRecord> buildSource() {
String tableList =
cdcSourceConfig.get(MongoDBSourceOptions.DATABASE)
+ "\\."
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 6269fb291..cc9644b54 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -19,9 +19,11 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
+import
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
import org.apache.paimon.schema.Schema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -30,7 +32,6 @@ import
com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import com.ververica.cdc.debezium.utils.JdbcUrlUtils;
import org.apache.flink.configuration.ConfigOption;
@@ -142,9 +143,9 @@ public class MySqlActionUtils {
return mySqlSchemasInfo;
}
- public static MySqlSource<String> buildMySqlSource(
+ public static MySqlSource<CdcSourceRecord> buildMySqlSource(
Configuration mySqlConfig, String tableList, TypeMapping
typeMapping) {
- MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+ MySqlSourceBuilder<CdcSourceRecord> sourceBuilder =
MySqlSource.builder();
sourceBuilder
.hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME))
@@ -220,8 +221,8 @@ public class MySqlActionUtils {
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(true,
customConverterConfigs);
+ CdcDebeziumDeserializationSchema schema =
+ new CdcDebeziumDeserializationSchema(true,
customConverterConfigs);
boolean scanNewlyAddedTables =
mySqlConfig.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 72b42b466..45dcaa3c2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
@@ -69,7 +70,7 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
* A parser for MySql Debezium JSON strings, converting them into a list of
{@link
* RichCdcMultiplexRecord}s.
*/
-public class MySqlRecordParser implements FlatMapFunction<String,
RichCdcMultiplexRecord> {
+public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord,
RichCdcMultiplexRecord> {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlRecordParser.class);
@@ -109,8 +110,9 @@ public class MySqlRecordParser implements
FlatMapFunction<String, RichCdcMultipl
}
@Override
- public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord>
out) throws Exception {
- root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+ public void flatMap(CdcSourceRecord rawEvent,
Collector<RichCdcMultiplexRecord> out)
+ throws Exception {
+ root = objectMapper.readValue((String) rawEvent.getValue(),
DebeziumEvent.class);
currentTable =
root.payload().source().get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
databaseName =
root.payload().source().get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index da3b80c5f..80a7c8b84 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
@@ -179,7 +180,7 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
}
@Override
- protected MySqlSource<String> buildSource() {
+ protected MySqlSource<CdcSourceRecord> buildSource() {
return MySqlActionUtils.buildMySqlSource(
cdcSourceConfig,
tableList(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 919cdab3c..de6c22a21 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
@@ -99,7 +100,7 @@ public class MySqlSyncTableAction extends
SyncTableActionBase {
}
@Override
- protected MySqlSource<String> buildSource() {
+ protected MySqlSource<CdcSourceRecord> buildSource() {
String tableList =
String.format(
"(%s)\\.(%s)",
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
index 49b78db41..45d8f69cc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java
@@ -19,9 +19,11 @@
package org.apache.paimon.flink.action.cdc.postgres;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
+import
org.apache.paimon.flink.action.cdc.serialization.CdcDebeziumDeserializationSchema;
import org.apache.paimon.options.OptionsUtils;
import org.apache.paimon.schema.Schema;
@@ -30,7 +32,6 @@ import
com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import
com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder.PostgresIncrementalSource;
import
com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
@@ -116,9 +117,9 @@ public class PostgresActionUtils {
return jdbcSchemasInfo;
}
- public static JdbcIncrementalSource<String> buildPostgresSource(
+ public static JdbcIncrementalSource<CdcSourceRecord> buildPostgresSource(
Configuration postgresConfig, String[] schemaList, String[]
tableList) {
- PostgresSourceBuilder<String> sourceBuilder =
PostgresIncrementalSource.builder();
+ PostgresSourceBuilder<CdcSourceRecord> sourceBuilder =
PostgresIncrementalSource.builder();
sourceBuilder
.hostname(postgresConfig.get(PostgresSourceOptions.HOSTNAME))
@@ -170,8 +171,8 @@ public class PostgresActionUtils {
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
- JsonDebeziumDeserializationSchema schema =
- new JsonDebeziumDeserializationSchema(true,
customConverterConfigs);
+ CdcDebeziumDeserializationSchema schema =
+ new CdcDebeziumDeserializationSchema(true,
customConverterConfigs);
return
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 6a2ef96a1..de54b97ae 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.postgres;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
@@ -81,7 +82,8 @@ import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
* A parser for PostgreSQL Debezium JSON strings, converting them into a list
of {@link
* RichCdcMultiplexRecord}s.
*/
-public class PostgresRecordParser implements FlatMapFunction<String,
RichCdcMultiplexRecord> {
+public class PostgresRecordParser
+ implements FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> {
private static final Logger LOG =
LoggerFactory.getLogger(PostgresRecordParser.class);
@@ -132,8 +134,9 @@ public class PostgresRecordParser implements
FlatMapFunction<String, RichCdcMult
}
@Override
- public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord>
out) throws Exception {
- root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+ public void flatMap(CdcSourceRecord rawEvent,
Collector<RichCdcMultiplexRecord> out)
+ throws Exception {
+ root = objectMapper.readValue((String) rawEvent.getValue(),
DebeziumEvent.class);
currentTable =
root.payload().source().get(AbstractSourceInfo.TABLE_NAME_KEY).asText();
databaseName =
root.payload().source().get(AbstractSourceInfo.DATABASE_NAME_KEY).asText();
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
index 6dbcf29da..e7d692b91 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableAction.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.action.cdc.postgres;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
@@ -101,7 +102,7 @@ public class PostgresSyncTableAction extends
SyncTableActionBase {
}
@Override
- protected JdbcIncrementalSource<String> buildSource() {
+ protected JdbcIncrementalSource<CdcSourceRecord> buildSource() {
List<JdbcSchemasInfo.JdbcSchemaInfo> pkTables =
postgresSchemasInfo.pkTables();
Set<String> schemaList = new HashSet<>();
String[] tableList = new String[pkTables.size()];
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
index 27560b835..66bdc1847 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java
@@ -18,10 +18,11 @@
package org.apache.paimon.flink.action.cdc.pulsar;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import
org.apache.paimon.flink.action.cdc.serialization.CdcJsonDeserializationSchema;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -50,6 +51,7 @@ import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -162,15 +164,15 @@ public class PulsarActionUtils {
.defaultValue(true)
.withDescription("To specify the boundedness of a
stream.");
- public static PulsarSource<String> buildPulsarSource(Configuration
pulsarConfig) {
- PulsarSourceBuilder<String> pulsarSourceBuilder =
PulsarSource.builder();
+ public static PulsarSource<CdcSourceRecord>
buildPulsarSource(Configuration pulsarConfig) {
+ PulsarSourceBuilder<CdcSourceRecord> pulsarSourceBuilder =
PulsarSource.builder();
// the minimum setup
pulsarSourceBuilder
.setServiceUrl(pulsarConfig.get(PULSAR_SERVICE_URL))
.setAdminUrl(pulsarConfig.get(PULSAR_ADMIN_URL))
.setSubscriptionName(pulsarConfig.get(PULSAR_SUBSCRIPTION_NAME))
- .setDeserializationSchema(new SimpleStringSchema());
+ .setDeserializationSchema(new CdcJsonDeserializationSchema());
pulsarConfig.getOptional(TOPIC).ifPresent(pulsarSourceBuilder::setTopics);
pulsarConfig.getOptional(TOPIC_PATTERN).ifPresent(pulsarSourceBuilder::setTopicPattern);
@@ -296,10 +298,10 @@ public class PulsarActionUtils {
SourceConfiguration pulsarSourceConfiguration = new
SourceConfiguration(pulsarConfig);
PulsarClient pulsarClient =
PulsarClientFactory.createClient(pulsarSourceConfiguration);
- ConsumerBuilder<String> consumerBuilder =
+ ConsumerBuilder<byte[]> consumerBuilder =
createConsumerBuilder(
pulsarClient,
- org.apache.pulsar.client.api.Schema.STRING,
+ org.apache.pulsar.client.api.Schema.BYTES,
pulsarSourceConfiguration);
// The default position is Latest
@@ -325,7 +327,7 @@ public class PulsarActionUtils {
}
// Create the consumer configuration by using common utils.
- Consumer<String> consumer = consumerBuilder.subscribe();
+ Consumer<byte[]> consumer = consumerBuilder.subscribe();
return new PulsarConsumerWrapper(consumer, topic);
} catch (PulsarClientException e) {
@@ -373,22 +375,25 @@ public class PulsarActionUtils {
private static class PulsarConsumerWrapper implements
MessageQueueSchemaUtils.ConsumerWrapper {
- private final Consumer<String> consumer;
+ private final Consumer<byte[]> consumer;
private final String topic;
- PulsarConsumerWrapper(Consumer<String> consumer, String topic) {
+ PulsarConsumerWrapper(Consumer<byte[]> consumer, String topic) {
this.consumer = consumer;
this.topic = topic;
}
@Override
- public List<String> getRecords(int pollTimeOutMills) {
+ public List<CdcSourceRecord> getRecords(int pollTimeOutMills) {
try {
- Message<String> message = consumer.receive(pollTimeOutMills,
TimeUnit.MILLISECONDS);
+ Message<byte[]> message = consumer.receive(pollTimeOutMills,
TimeUnit.MILLISECONDS);
+ CdcJsonDeserializationSchema deserializationSchema =
+ new CdcJsonDeserializationSchema();
return message == null
? Collections.emptyList()
- : Collections.singletonList(message.getValue());
- } catch (PulsarClientException e) {
+ : Collections.singletonList(
+
deserializationSchema.deserialize(message.getValue()));
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
new file mode 100644
index 000000000..9ce39ad29
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcDebeziumDeserializationSchema.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.serialization;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/**
+ * A JSON format implementation of {@link DebeziumDeserializationSchema} which
deserializes the
+ * received {@link SourceRecord} to {@link CdcSourceRecord}.
+ */
+public class CdcDebeziumDeserializationSchema
+ implements DebeziumDeserializationSchema<CdcSourceRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient JsonConverter jsonConverter;
+
+ /**
+ * Configuration whether to enable {@link
JsonConverterConfig#SCHEMAS_ENABLE_CONFIG} to include
+ * schema in messages.
+ */
+ private final Boolean includeSchema;
+
+ /** The custom configurations for {@link JsonConverter}. */
+ private final Map<String, Object> customConverterConfigs;
+
+ public CdcDebeziumDeserializationSchema() {
+ this(false);
+ }
+
+ public CdcDebeziumDeserializationSchema(Boolean includeSchema) {
+ this(includeSchema, null);
+ }
+
+ public CdcDebeziumDeserializationSchema(
+ Boolean includeSchema, Map<String, Object> customConverterConfigs)
{
+ this.includeSchema = includeSchema;
+ this.customConverterConfigs = customConverterConfigs;
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<CdcSourceRecord>
out) throws Exception {
+ if (jsonConverter == null) {
+ initializeJsonConverter();
+ }
+ byte[] bytes =
+ jsonConverter.fromConnectData(record.topic(),
record.valueSchema(), record.value());
+ out.collect(new CdcSourceRecord(record.topic(), null, new
String(bytes)));
+ }
+
+ /** Initialize {@link JsonConverter} with given configs. */
+ private void initializeJsonConverter() {
+ jsonConverter = new JsonConverter();
+ final HashMap<String, Object> configs = new HashMap<>(2);
+ configs.put(ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName());
+ configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, includeSchema);
+ if (customConverterConfigs != null) {
+ configs.putAll(customConverterConfigs);
+ }
+ jsonConverter.configure(configs);
+ }
+
+ @Override
+ public TypeInformation<CdcSourceRecord> getProducedType() {
+ return getForClass(CdcSourceRecord.class);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
new file mode 100644
index 000000000..07461cd49
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/serialization/CdcJsonDeserializationSchema.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.serialization;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
+
+/** A simple deserialization schema for {@link CdcSourceRecord}. */
+public class CdcJsonDeserializationSchema implements
DeserializationSchema<CdcSourceRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CdcJsonDeserializationSchema.class);
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public CdcJsonDeserializationSchema() {
+ objectMapper
+ .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS,
true)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ }
+
+ @Override
+ public CdcSourceRecord deserialize(byte[] message) throws IOException {
+ if (message == null) {
+ return null;
+ }
+
+ try {
+ return new CdcSourceRecord(objectMapper.readValue(message,
JsonNode.class));
+ } catch (Exception e) {
+ LOG.error("Invalid Json:\n{}", new String(message));
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(CdcSourceRecord nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<CdcSourceRecord> getProducedType() {
+ return getForClass(CdcSourceRecord.class);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
index 286d5ffaa..2099287a6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcTimestampExtractorFactory.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.action.cdc.watermark;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
@@ -65,8 +67,8 @@ public class CdcTimestampExtractorFactory implements
Serializable {
private static final long serialVersionUID = 1L;
@Override
- public long extractTimestamp(String record) throws
JsonProcessingException {
- return JsonSerdeUtil.extractValue(record, Long.class, "ts_ms");
+ public long extractTimestamp(CdcSourceRecord record) throws
JsonProcessingException {
+ return JsonSerdeUtil.extractValue((JsonNode) record.getValue(),
Long.class, "ts_ms");
}
}
@@ -76,7 +78,9 @@ public class CdcTimestampExtractorFactory implements
Serializable {
private static final long serialVersionUID = 1L;
@Override
- public long extractTimestamp(String record) throws
JsonProcessingException {
+ public long extractTimestamp(CdcSourceRecord cdcSourceRecord)
+ throws JsonProcessingException {
+ JsonNode record = (JsonNode) cdcSourceRecord.getValue();
if (JsonSerdeUtil.isNodeExists(record, "mysqlType")) {
// Canal json
return JsonSerdeUtil.extractValue(record, Long.class, "ts");
@@ -108,14 +112,15 @@ public class CdcTimestampExtractorFactory implements
Serializable {
public static class MysqlCdcTimestampExtractor implements
CdcTimestampExtractor {
@Override
- public long extractTimestamp(String record) throws
JsonProcessingException {
- return JsonSerdeUtil.extractValue(record, Long.class, "payload",
"ts_ms");
+ public long extractTimestamp(CdcSourceRecord record) throws
JsonProcessingException {
+ return JsonSerdeUtil.extractValue(
+ (JsonNode) record.getValue(), Long.class, "payload",
"ts_ms");
}
}
/** Interface defining the contract for CDC timestamp extraction. */
public interface CdcTimestampExtractor extends Serializable {
- long extractTimestamp(String record) throws JsonProcessingException;
+ long extractTimestamp(CdcSourceRecord record) throws
JsonProcessingException;
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
index 218d45d63..41816b0ce 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/watermark/CdcWatermarkStrategy.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.watermark;
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import
org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractorFactory.CdcTimestampExtractor;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -32,7 +33,7 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
* Watermark strategy for CDC sources, generating watermarks based on
timestamps extracted from
* records.
*/
-public class CdcWatermarkStrategy implements WatermarkStrategy<String> {
+public class CdcWatermarkStrategy implements
WatermarkStrategy<CdcSourceRecord> {
private final CdcTimestampExtractor timestampExtractor;
private static final long serialVersionUID = 1L;
@@ -43,12 +44,12 @@ public class CdcWatermarkStrategy implements
WatermarkStrategy<String> {
}
@Override
- public WatermarkGenerator<String> createWatermarkGenerator(
+ public WatermarkGenerator<CdcSourceRecord> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
- return new WatermarkGenerator<String>() {
+ return new WatermarkGenerator<CdcSourceRecord>() {
@Override
- public void onEvent(String record, long timestamp, WatermarkOutput
output) {
+ public void onEvent(CdcSourceRecord record, long timestamp,
WatermarkOutput output) {
long tMs;
try {
tMs = timestampExtractor.extractTimestamp(record);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
index 5785f680a..5bb63b6a8 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java
@@ -765,10 +765,8 @@ public class KafkaSyncTableActionITCase extends
KafkaActionITCaseBase {
+ "1.000011, 2.000022, 3.000033, "
+ "1.000111, 2.000222, 3.000333, "
+ "12345.110, 12345.220, 12345.330, "
- // TODO fix FIXED
- + "1.2345678987654322E32,
1.2345678987654322E32, 1.2345678987654322E32, "
- // TODO fix BIG DECIMAL
- + "11111, 22222, 33333,
2222222222222222400000000000.0000000000, "
+ + "123456789876543212345678987654321.11,
123456789876543212345678987654321.22, 123456789876543212345678987654321.33, "
+ + "11111, 22222, 33333,
2222222222222222300000001111.1234567890, "
+ "19439, "
// display value of datetime is not affected
by timezone
+ "2023-03-23T14:30:05,
2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, "