This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d1d88fd [INLONG-5119][Sort] Import all changelog mode data ingest 
into Kafka (#5140)
73d1d88fd is described below

commit 73d1d88fd06bf003c06a7fbc8b06b3c4e02be534
Author: Oneal65 <[email protected]>
AuthorDate: Thu Jul 21 16:33:21 2022 +0800

    [INLONG-5119][Sort] Import all changelog mode data ingest into Kafka (#5140)
---
 .../protocol/node/extract/KafkaExtractNode.java    |   4 +-
 .../sort/protocol/node/load/KafkaLoadNode.java     |   3 +-
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java |  19 ++++
 .../sort/kafka/table/KafkaDynamicTableFactory.java |   5 +
 .../inlong/sort/kafka/table/KafkaOptions.java      |  38 +++++++
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  | 116 +++++++++++++++++++++
 6 files changed, 183 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index a28cfb1d9..f4af269bb 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -118,7 +118,9 @@ public class KafkaExtractNode extends ExtractNode 
implements Metadata, Serializa
             if (StringUtils.isEmpty(this.primaryKey)) {
                 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
                 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
-                options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
+                if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+                    options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS, 
scanSpecificOffsets);
+                }
                 options.putAll(format.generateOptions(false));
             } else {
                 options.put(KafkaConstant.CONNECTOR, 
KafkaConstant.UPSERT_KAFKA);
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 5c6e68137..18132f4e9 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -112,7 +112,8 @@ public class KafkaLoadNode extends LoadNode implements 
Metadata, Serializable {
         }
         if (format instanceof JsonFormat || format instanceof AvroFormat || 
format instanceof CsvFormat) {
             if (StringUtils.isEmpty(this.primaryKey)) {
-                options.put("connector", "kafka");
+                options.put("connector", "kafka-inlong");
+                options.put("sink.ignore.changelog", "true");
                 options.putAll(format.generateOptions(false));
             } else {
                 options.put("connector", "upsert-kafka");
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index 385609ead..f2d42d3a2 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.connectors.kafka.table.BufferedUpsertSinkFunct
 import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
 import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -39,6 +40,8 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import 
org.apache.inlong.sort.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
 import org.apache.kafka.common.header.Header;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -53,12 +56,14 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
 
 /**
  * A version-agnostic Kafka {@link DynamicTableSink}.
  */
 @Internal
 public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetadata {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDynamicSink.class);
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -101,6 +106,11 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
      */
     protected final Properties properties;
 
+    /**
+     * CatalogTable for KAFKA_IGNORE_ALL_CHANGELOG
+     */
+    private final CatalogTable catalogTable;
+
     // 
--------------------------------------------------------------------------------------------
     // Kafka-specific attributes
     // 
--------------------------------------------------------------------------------------------
@@ -147,6 +157,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
             @Nullable String keyPrefix,
             String topic,
             Properties properties,
+            CatalogTable table,
             @Nullable FlinkKafkaPartitioner<RowData> partitioner,
             KafkaSinkSemantic semantic,
             boolean upsertMode,
@@ -168,6 +179,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
         // Kafka-specific attributes
         this.topic = checkNotNull(topic, "Topic must not be null.");
         this.properties = checkNotNull(properties, "Properties must not be 
null.");
+        this.catalogTable = table;
         this.partitioner = partitioner;
         this.semantic = checkNotNull(semantic, "Semantic must not be null.");
         this.upsertMode = upsertMode;
@@ -181,6 +193,12 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if 
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+                .get(KAFKA_IGNORE_ALL_CHANGELOG)) {
+            LOG.warn("Kafka sink receive all changelog record. "
+                    + "Regard any other record as insert-only record.");
+            return ChangelogMode.all();
+        }
         return valueEncodingFormat.getChangelogMode();
     }
 
@@ -265,6 +283,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         keyPrefix,
                         topic,
                         properties,
+                        catalogTable,
                         partitioner,
                         semantic,
                         upsertMode,
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index ea41c312f..97ffe1ec4 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -87,6 +87,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static 
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
 
 /**
  * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -205,6 +206,7 @@ public class KafkaDynamicTableFactory
         options.add(SINK_PARTITIONER);
         options.add(SINK_SEMANTIC);
         options.add(SINK_PARALLELISM);
+        options.add(KAFKA_IGNORE_ALL_CHANGELOG);
         return options;
     }
 
@@ -305,6 +307,7 @@ public class KafkaDynamicTableFactory
                 keyPrefix,
                 tableOptions.get(TOPIC).get(0),
                 getKafkaProperties(context.getCatalogTable().getOptions()),
+                context.getCatalogTable(),
                 getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()).orElse(null),
                 getSinkSemantic(tableOptions),
                 parallelism);
@@ -350,6 +353,7 @@ public class KafkaDynamicTableFactory
             @Nullable String keyPrefix,
             String topic,
             Properties properties,
+            CatalogTable table,
             FlinkKafkaPartitioner<RowData> partitioner,
             KafkaSinkSemantic semantic,
             Integer parallelism) {
@@ -363,6 +367,7 @@ public class KafkaDynamicTableFactory
                 keyPrefix,
                 topic,
                 properties,
+                table,
                 partitioner,
                 semantic,
                 false,
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
new file mode 100644
index 000000000..76a66c500
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Option utils for Kafka table source sink. */
+public class KafkaOptions {
+    private KafkaOptions() {
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Sink specific options
+    // 
--------------------------------------------------------------------------------------------
+    public static final ConfigOption<Boolean> KAFKA_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
+
+}
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
new file mode 100644
index 000000000..175da2264
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Test for kafka load sql parse
+ */
+public class KafkaLoadSqlParseTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMysqlExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new 
StringFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Map<String, String> map = new HashMap<>();
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, map, "age",
+                Collections.singletonList("user"), "localhost", "root", 
"888888",
+                "test", null, null,
+                true, null);
+    }
+
+    private KafkaLoadNode buildKafkaLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("age", new 
StringFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("age", new 
StringFormatInfo()),
+                                new FieldInfo("age", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new 
StringFormatInfo())));
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+                null, "topic_output", "localhost:9092", new JsonFormat(),
+                null, null, null);
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    /**
+     * Test mysql to kafka
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testKafkaSourceSqlParse() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMysqlExtractNode();
+        Node outputNode = buildKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}

Reply via email to