This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73d1d88fd [INLONG-5119][Sort] Import all changelog mode data ingest
into Kafka (#5140)
73d1d88fd is described below
commit 73d1d88fd06bf003c06a7fbc8b06b3c4e02be534
Author: Oneal65 <[email protected]>
AuthorDate: Thu Jul 21 16:33:21 2022 +0800
[INLONG-5119][Sort] Import all changelog mode data ingest into Kafka (#5140)
---
.../protocol/node/extract/KafkaExtractNode.java | 4 +-
.../sort/protocol/node/load/KafkaLoadNode.java | 3 +-
.../apache/inlong/sort/kafka/KafkaDynamicSink.java | 19 ++++
.../sort/kafka/table/KafkaDynamicTableFactory.java | 5 +
.../inlong/sort/kafka/table/KafkaOptions.java | 38 +++++++
.../inlong/sort/parser/KafkaLoadSqlParseTest.java | 116 +++++++++++++++++++++
6 files changed, 183 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index a28cfb1d9..f4af269bb 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -118,7 +118,9 @@ public class KafkaExtractNode extends ExtractNode
implements Metadata, Serializa
if (StringUtils.isEmpty(this.primaryKey)) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
+ if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+ options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
+ }
options.putAll(format.generateOptions(false));
} else {
options.put(KafkaConstant.CONNECTOR,
KafkaConstant.UPSERT_KAFKA);
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 5c6e68137..18132f4e9 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -112,7 +112,8 @@ public class KafkaLoadNode extends LoadNode implements
Metadata, Serializable {
}
if (format instanceof JsonFormat || format instanceof AvroFormat ||
format instanceof CsvFormat) {
if (StringUtils.isEmpty(this.primaryKey)) {
- options.put("connector", "kafka");
+ options.put("connector", "kafka-inlong");
+ options.put("sink.ignore.changelog", "true");
options.putAll(format.generateOptions(false));
} else {
options.put("connector", "upsert-kafka");
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index 385609ead..f2d42d3a2 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -26,6 +26,7 @@ import
org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunct
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -39,6 +40,8 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import
org.apache.inlong.sort.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
import org.apache.kafka.common.header.Header;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -53,12 +56,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
/**
* A version-agnostic Kafka {@link DynamicTableSink}.
*/
@Internal
public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetadata {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaDynamicSink.class);
//
--------------------------------------------------------------------------------------------
// Mutable attributes
@@ -101,6 +106,11 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
*/
protected final Properties properties;
+ /**
+ * CatalogTable for KAFKA_IGNORE_ALL_CHANGELOG
+ */
+ private final CatalogTable catalogTable;
+
//
--------------------------------------------------------------------------------------------
// Kafka-specific attributes
//
--------------------------------------------------------------------------------------------
@@ -147,6 +157,7 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
@Nullable String keyPrefix,
String topic,
Properties properties,
+ CatalogTable table,
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
KafkaSinkSemantic semantic,
boolean upsertMode,
@@ -168,6 +179,7 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
// Kafka-specific attributes
this.topic = checkNotNull(topic, "Topic must not be null.");
this.properties = checkNotNull(properties, "Properties must not be
null.");
+ this.catalogTable = table;
this.partitioner = partitioner;
this.semantic = checkNotNull(semantic, "Semantic must not be null.");
this.upsertMode = upsertMode;
@@ -181,6 +193,12 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+ .get(KAFKA_IGNORE_ALL_CHANGELOG)) {
+ LOG.warn("Kafka sink receive all changelog record. "
+ + "Regard any other record as insert-only record.");
+ return ChangelogMode.all();
+ }
return valueEncodingFormat.getChangelogMode();
}
@@ -265,6 +283,7 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
keyPrefix,
topic,
properties,
+ catalogTable,
partitioner,
semantic,
upsertMode,
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index ea41c312f..97ffe1ec4 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -87,6 +87,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
/**
* Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -205,6 +206,7 @@ public class KafkaDynamicTableFactory
options.add(SINK_PARTITIONER);
options.add(SINK_SEMANTIC);
options.add(SINK_PARALLELISM);
+ options.add(KAFKA_IGNORE_ALL_CHANGELOG);
return options;
}
@@ -305,6 +307,7 @@ public class KafkaDynamicTableFactory
keyPrefix,
tableOptions.get(TOPIC).get(0),
getKafkaProperties(context.getCatalogTable().getOptions()),
+ context.getCatalogTable(),
getFlinkKafkaPartitioner(tableOptions,
context.getClassLoader()).orElse(null),
getSinkSemantic(tableOptions),
parallelism);
@@ -350,6 +353,7 @@ public class KafkaDynamicTableFactory
@Nullable String keyPrefix,
String topic,
Properties properties,
+ CatalogTable table,
FlinkKafkaPartitioner<RowData> partitioner,
KafkaSinkSemantic semantic,
Integer parallelism) {
@@ -363,6 +367,7 @@ public class KafkaDynamicTableFactory
keyPrefix,
topic,
properties,
+ table,
partitioner,
semantic,
false,
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
new file mode 100644
index 000000000..76a66c500
--- /dev/null
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+ private KafkaOptions() {
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Sink specific options
+ //
--------------------------------------------------------------------------------------------
+ public static final ConfigOption<Boolean> KAFKA_IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
+
+}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
new file mode 100644
index 000000000..175da2264
--- /dev/null
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for kafka load sql parse
+ */
+public class KafkaLoadSqlParseTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMysqlExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ Map<String, String> map = new HashMap<>();
+ return new MySqlExtractNode("1", "mysql_input", fields,
+ null, map, "age",
+ Collections.singletonList("user"), "localhost", "root",
"888888",
+ "test", null, null,
+ true, null);
+ }
+
+ private KafkaLoadNode buildKafkaLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("age", new
StringFormatInfo()),
+ new FieldInfo("age", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new
StringFormatInfo())));
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ null, "topic_output", "localhost:9092", new JsonFormat(),
+ null, null, null);
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node>
outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ /**
+ * Test mysql to kafka
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testKafkaSourceSqlParse() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMysqlExtractNode();
+ Node outputNode = buildKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}