This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new acf94d7  [FLINK-17802][kafka] Set offset commit only if group id is 
configured for new Kafka Table source
acf94d7 is described below

commit acf94d7748b08ae56d28214614dec045287e7d26
Author: Leonard Xu <[email protected]>
AuthorDate: Tue May 26 21:27:27 2020 +0800

    [FLINK-17802][kafka] Set offset commit only if group id is configured for 
new Kafka Table source
    
    This closes #12254
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java   |  2 +-
 .../kafka/table/KafkaDynamicSourceBase.java        |  1 +
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 31 +++++++++++-----------
 .../table/KafkaDynamicTableFactoryTestBase.java    | 31 ++++++++++++++++++++++
 4 files changed, 48 insertions(+), 17 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 733011f..e8c0ae4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -1119,7 +1119,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        }
 
        @VisibleForTesting
-       boolean getEnableCommitOnCheckpoints() {
+       public boolean getEnableCommitOnCheckpoints() {
                return enableCommitOnCheckpoints;
        }
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
index 48f77d9..0c97715 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
@@ -209,6 +209,7 @@ public abstract class KafkaDynamicSourceBase implements 
ScanTableSource {
                                
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
                                break;
                        }
+               
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") 
!= null);
                return kafkaConsumer;
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 218e55d..5e4dc91 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -170,24 +170,23 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
                actualKafkaSource.getDataStream(mock);
                
assertTrue(getExpectedFlinkKafkaConsumer().isAssignableFrom(mock.sourceFunction.getClass()));
+               // Test commitOnCheckpoints flag should be true when set 
consumer group.
                assertTrue(((FlinkKafkaConsumerBase) 
mock.sourceFunction).getEnableCommitOnCheckpoints());
+       }
 
-               Properties propsWithoutGroupId = new Properties();
-               propsWithoutGroupId.setProperty("bootstrap.servers", "dummy");
-
-               final KafkaTableSourceBase sourceWithoutGroupId = 
getExpectedKafkaTableSource(
-                       schema,
-                       Optional.of(PROC_TIME),
-                       rowtimeAttributeDescriptors,
-                       fieldMapping,
-                       TOPIC,
-                       propsWithoutGroupId,
-                       deserializationSchema,
-                       StartupMode.LATEST,
-                       new HashMap<>(),
-                       0L);
-
-               sourceWithoutGroupId.getDataStream(mock);
+       @Test
+       public void testTableSourceCommitOnCheckpointsDisabled() {
+               Map<String, String> propertiesMap = new HashMap<>();
+               createKafkaSourceProperties().forEach((k, v) -> {
+                       if (!k.equals("connector.properties.group.id")) {
+                               propertiesMap.put(k, v);
+                       }
+               });
+               final TableSource<?> tableSource = 
TableFactoryService.find(StreamTableSourceFactory.class, propertiesMap)
+                       .createStreamTableSource(propertiesMap);
+               final StreamExecutionEnvironmentMock mock = new 
StreamExecutionEnvironmentMock();
+               // Test commitOnCheckpoints flag should be false when do not 
set consumer group.
+               ((KafkaTableSourceBase) tableSource).getDataStream(mock);
                assertTrue(mock.sourceFunction instanceof 
FlinkKafkaConsumerBase);
                assertFalse(((FlinkKafkaConsumerBase) 
mock.sourceFunction).getEnableCommitOnCheckpoints());
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 29d11fa..9ea36ff 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -61,7 +62,9 @@ import java.util.function.Consumer;
 import static org.apache.flink.util.CoreMatchers.containsCause;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Abstract test base for {@link KafkaDynamicTableFactoryBase}.
@@ -153,6 +156,34 @@ public abstract class KafkaDynamicTableFactoryTestBase 
extends TestLogger {
                final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
                final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
                assertThat(sourceFunction, 
instanceOf(getExpectedConsumerClass()));
+               //  Test commitOnCheckpoints flag should be true when set 
consumer group
+               assertTrue(((FlinkKafkaConsumerBase) 
sourceFunction).getEnableCommitOnCheckpoints());
+       }
+
+       @Test
+       public void testTableSourceCommitOnCheckpointsDisabled() {
+               //Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+               Map<String, String> tableOptions = getFullSourceOptions();
+               tableOptions.remove("properties.group.id");
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(tableOptions);
+               final DynamicTableSource tableSource = 
FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader());
+
+               // Test commitOnCheckpoints flag should be false when do not 
set consumer group.
+               assertThat(tableSource, 
instanceOf(KafkaDynamicSourceBase.class));
+               ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = 
((KafkaDynamicSourceBase) tableSource)
+                       
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+               assertThat(providerWithoutGroupId, 
instanceOf(SourceFunctionProvider.class));
+               final SourceFunctionProvider functionProviderWithoutGroupId = 
(SourceFunctionProvider) providerWithoutGroupId;
+               final SourceFunction<RowData> function = 
functionProviderWithoutGroupId.createSourceFunction();
+               assertFalse(((FlinkKafkaConsumerBase) 
function).getEnableCommitOnCheckpoints());
        }
 
        @Test

Reply via email to