mxm commented on code in PR #44: URL: https://github.com/apache/flink-connector-kafka/pull/44#discussion_r1363867482
########## flink-connector-kafka/src/test/resources/log4j2-test.properties: ########## @@ -36,3 +36,12 @@ logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = OFF logger.I0Itec.name = org.I0Itec logger.I0Itec.level = OFF + +logger.DynamicKafkaSourceReader.name = org.apache.flink.connector.kafka.source.reader.DynamicKafkaSourceReader +logger.DynamicKafkaSourceReader.level = OFF + +logger.kafkaSourceReader.name = org.apache.flink.connector.kafka.source.reader.KafkaSourceReader +logger.kafkaSourceReader.level = OFF + +logger.SourceReaderBase.name = org.apache.flink.connector.base.source.reader.SourceReaderBase +logger.SourceReaderBase.level = OFF Review Comment: Why are these disabled for tests? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSourceOptions.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Streams; + +import java.util.Properties; +import java.util.function.Function; + +/** + * The connector options for {@link DynamicKafkaSource} that can be passed through the source + * properties e.g. {@link DynamicKafkaSourceBuilder#setProperties(Properties)}. + */ +@Internal +public class DynamicKafkaSourceOptions { + + private DynamicKafkaSourceOptions() {} + + public static final ConfigOption<Long> STREAM_METADATA_DISCOVERY_INTERVAL_MS = + ConfigOptions.key("stream-metadata-discovery-interval-ms") + .longType() + .noDefaultValue() + .withDescription( + "The interval in milliseconds for the sink to discover " + + "the changes in stream metadata. A non-positive value disables the stream metadata discovery."); Review Comment: Long defaults to zero which could be seen as a positive value. Would set a default value of `-1` here. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumState; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumStateSerializer; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumerator; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaStreamSubscriber; +import org.apache.flink.connector.kafka.source.reader.DynamicKafkaSourceReader; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplit; +import org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.Properties; + +/** + * Factory class for the DynamicKafkaSource components. <a + * href="https://cwiki.apache.org/confluence/x/CBn1D">FLIP-246: DynamicKafkaSource</a> + * + * <p>This source's key difference from {@link KafkaSource} is that it enables users to read + * dynamically from topics, which belong to one or more clusters. Review Comment: We should clarify what "dynamically" means. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; Review Comment: Should this be in a separate package, e.g. `org.apache.flink.connector.kafka.dynamic.source`? This might be useful for differentiating classes from the regular Kafka source. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSourceOptions.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Streams; + +import java.util.Properties; +import java.util.function.Function; + +/** + * The connector options for {@link DynamicKafkaSource} that can be passed through the source + * properties e.g. {@link DynamicKafkaSourceBuilder#setProperties(Properties)}. + */ +@Internal +public class DynamicKafkaSourceOptions { + + private DynamicKafkaSourceOptions() {} + + public static final ConfigOption<Long> STREAM_METADATA_DISCOVERY_INTERVAL_MS = + ConfigOptions.key("stream-metadata-discovery-interval-ms") + .longType() + .noDefaultValue() + .withDescription( + "The interval in milliseconds for the sink to discover " + + "the changes in stream metadata. A non-positive value disables the stream metadata discovery."); + + public static final ConfigOption<Integer> STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD = + ConfigOptions.key("stream-metadata-discovery-failure-threshold") + .intType() + .defaultValue(1) + .withDescription( + "The number of consecutive failures before letting the exception from Kafka metadata service discovery " + + "trigger jobmanager failure and global failover. The default is one to at least catch startup " + + "failures. This is only implemented for the source"); + + @Internal + public static <T> T getOption( + Properties props, ConfigOption<?> configOption, Function<String, T> parser) { + String value = props.getProperty(configOption.key()); + return (T) (value == null ? configOption.defaultValue() : parser.apply(value)); + } + + @Internal + public static boolean containsOption(Properties props, ConfigOption<?> configOption) { Review Comment: Is this being used anywhere apart from tests? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/DynamicKafkaSource.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumState; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumStateSerializer; +import org.apache.flink.connector.kafka.source.enumerator.DynamicKafkaSourceEnumerator; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.metadata.KafkaMetadataService; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaStreamSubscriber; +import org.apache.flink.connector.kafka.source.reader.DynamicKafkaSourceReader; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplit; +import org.apache.flink.connector.kafka.source.split.DynamicKafkaSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.util.Properties; + +/** + * Factory class for the DynamicKafkaSource components. <a + * href="https://cwiki.apache.org/confluence/x/CBn1D">FLIP-246: DynamicKafkaSource</a> + * + * <p>This source's key difference from {@link KafkaSource} is that it enables users to read + * dynamically from topics, which belong to one or more clusters. + * + * @param <T> Record type + */ +@PublicEvolving +public class DynamicKafkaSource<T> Review Comment: FYI I see some references still in the FLIP which state `MultiClusterKafkaSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
