lgbo-ustc commented on code in PR #9554: URL: https://github.com/apache/incubator-gluten/pull/9554#discussion_r2080810487
########## gluten-flink/connectors/src/main/java/org/apache/gluten/connectors/kafka/GlutenKafkaSource.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.gluten.connectors.kafka; + + import java.util.Properties; + + import org.apache.flink.api.common.typeinfo.TypeInformation; + import org.apache.flink.api.connector.source.Boundedness; + import org.apache.flink.api.connector.source.Source; + import org.apache.flink.api.connector.source.SourceReader; + import org.apache.flink.api.connector.source.SourceReaderContext; + import org.apache.flink.api.connector.source.SplitEnumerator; + import org.apache.flink.api.connector.source.SplitEnumeratorContext; + import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; + import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer; + import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator; + import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; + import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; + import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; + import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; + import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer; + import org.apache.flink.core.io.SimpleVersionedSerializer; + import org.apache.flink.streaming.connectors.kafka.config.StartupMode; + import org.apache.flink.table.types.DataType; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class GlutenKafkaSource<OUT> implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>, ResultTypeQueryable<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(GlutenKafkaSource.class); + + private final Properties properties; + + private final String format; + + private final DataType outputType; + + private final KafkaRecordDeserializationSchema<OUT> deserializationSchema; + + private final KafkaSubscriber subscriber; + + private final OffsetsInitializer startingOffsetsInitializer; + + private final OffsetsInitializer stoppingOffsetsInitializer; + + private String planNodeId; + + public GlutenKafkaSource( + String planNodeId, + String format, + Properties props, + DataType outpuType, + KafkaRecordDeserializationSchema<OUT> deserializationSchema, + KafkaSubscriber subscriber, + OffsetsInitializer startOffsetsInitializer, + OffsetsInitializer stopOffsetsInitializer) { + this.planNodeId = planNodeId; + this.format = format; + this.properties = props; + this.outputType = outpuType; + this.deserializationSchema = deserializationSchema; + this.subscriber = subscriber; + this.startingOffsetsInitializer = startOffsetsInitializer; + this.stoppingOffsetsInitializer = stopOffsetsInitializer; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) throws Exception { + return new GlutenKafkaSourceReader<OUT>(planNodeId, format, outputType, properties); + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deserializationSchema.getProducedType(); + } + + public void setPlanNodeId(String planNodeId) { + this.planNodeId = planNodeId; + } + + public void setEnableAutoCommitOffset(boolean enabled) { + this.properties.setProperty("enable.auto.commit", String.valueOf(enabled)); + } + + public void setStartupMode(StartupMode mode) { + switch(mode) { + case GROUP_OFFSETS : + this.properties.setProperty("scan.startup.mode", "group-offsets"); + break; + case LATEST: + this.properties.setProperty("scan.startup.mode", "latest-offsets"); + break; + case EARLIEST: + this.properties.setProperty("scan.startup.mode", "earliest-offsets"); + break; + default: + this.properties.setProperty("scan.startup.mode", "group-offsets"); + break; + } + } + + @Override + public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator( + SplitEnumeratorContext<KafkaPartitionSplit> enumContext) throws Exception { + return new KafkaSourceEnumerator( + subscriber, + startingOffsetsInitializer, + stoppingOffsetsInitializer, + properties, + enumContext, + getBoundedness()); + } + + @Override + public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator( + SplitEnumeratorContext<KafkaPartitionSplit> enumContext, KafkaSourceEnumState checkpoint) throws Exception { + return new KafkaSourceEnumerator( + subscriber, + startingOffsetsInitializer, + stoppingOffsetsInitializer, + properties, + enumContext, + getBoundedness(), + checkpoint); + } + + @Override + public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() { + return new KafkaPartitionSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() { + return new KafkaSourceEnumStateSerializer(); + } + + } Review Comment: add a new line at the end ########## gluten-flink/connectors/src/main/java/org/apache/gluten/connectors/kafka/GlutenKafkaDynamicSource.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.connectors.kafka; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.formats.raw.RawFormatDeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GlutenKafkaDynamicSource extends KafkaDynamicSource{ + + private static final Logger LOG = LoggerFactory.getLogger(GlutenKafkaDynamicSource.class); + + public GlutenKafkaDynamicSource(DataType physicalDataType, + DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat, + DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + String keyPrefix, + List<String> topics, + Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map<KafkaTopicPartition, Long> specificStartupOffsets, + long startupTimestampMillis, + BoundedMode boundedMode, + Map<KafkaTopicPartition, Long> specificBoundedOffsets, + long boundedTimestampMillis, + boolean upsertMode, + String tableIdentifier) { + super(physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, + topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, boundedMode, + specificBoundedOffsets, boundedTimestampMillis, upsertMode, tableIdentifier); + } + + public static GlutenKafkaDynamicSource copyFrom(KafkaDynamicSource source) throws Exception { + final GlutenKafkaDynamicSource copy = + new GlutenKafkaDynamicSource( + getFieldFromKafkaSource(source, "physicalDataType", DataType.class), + getFieldFromKafkaSource(source, "keyDecodingFormat", DecodingFormat.class), + getFieldFromKafkaSource(source, "valueDecodingFormat", DecodingFormat.class), + getFieldFromKafkaSource(source, "keyProjection", int[].class), + getFieldFromKafkaSource(source, "valueProjection", int[].class), + getFieldFromKafkaSource(source, "keyPrefix", String.class), + getFieldFromKafkaSource(source, "topics", List.class), + getFieldFromKafkaSource(source, "topicPattern", Pattern.class), + getFieldFromKafkaSource(source, "properties", Properties.class), + getFieldFromKafkaSource(source, "startupMode", StartupMode.class), + getFieldFromKafkaSource(source, "specificStartupOffsets", Map.class), + getFieldFromKafkaSource(source, "startupTimestampMillis", long.class), + getFieldFromKafkaSource(source, "boundedMode", BoundedMode.class), + getFieldFromKafkaSource(source, "specificBoundedOffsets", Map.class), + getFieldFromKafkaSource(source, "boundedTimestampMillis", long.class), + getFieldFromKafkaSource(source, "upsertMode", boolean.class), + getFieldFromKafkaSource(source, "tableIdentifier", String.class)); + copy.producedDataType = getFieldFromKafkaSource(source, "producedDataType", DataType.class); + copy.metadataKeys = getFieldFromKafkaSource(source, "metadataKeys", List.class); + copy.watermarkStrategy = getFieldFromKafkaSource(source, "watermarkStrategy", WatermarkStrategy.class); + return copy; + } + + private static <T> T getFieldValue(Object source, Class<?> sourceClazz, String fieldName, Class<T> fieldClazz) { + try { + Field f = sourceClazz.getDeclaredField(fieldName); + f.setAccessible(true); + Class<?> fClazz = f.getType(); + if (fClazz.equals(fieldClazz)) { + return (T) f.get(source); + } else { + String errMsg = String.format("Faield to get field %s from KafkaDynamimcSource, the field class {} not match with given class: {}", + fieldName, fClazz.getName(), fieldClazz.getName()); + throw new RuntimeException(errMsg); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static <T> T getFieldFromKafkaSource(KafkaDynamicSource source, String fieldName, Class<T> fieldClazz) { + return getFieldValue(source, KafkaDynamicSource.class, fieldName, fieldClazz); + } + + private static <T> T getFieldFromKafkaSource(KafkaSource<?> source, String fieldName, Class<T> fieldClazz) { + return getFieldValue(source, KafkaSource.class, fieldName, fieldClazz); + } + + private static <T> T invokeMethodFromKafkaSource(KafkaDynamicSource source, String methodName, Class<?>[] argClazzs, Object[] args) { + try { + Method m = KafkaDynamicSource.class.getDeclaredMethod(methodName, argClazzs); + if (m != null) { + m.setAccessible(true); + return (T) m.invoke(source, args); + } else { + throw new RuntimeException("Failed to get method name of: " + methodName); + } + } catch (Exception e) { + throw new RuntimeException(e); + } Review Comment: Is the catch here unnecessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
