stevenzwu commented on code in PR #8553: URL: https://github.com/apache/iceberg/pull/8553#discussion_r1336108149
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/IcebergEventTimeExtractor.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import java.io.Serializable; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +/** The interface used to extract watermarks and event timestamps from splits and records. */ +public interface IcebergEventTimeExtractor<T> extends Serializable { Review Comment: do we need this interface? can we just use `TimestampAssigner` from Flink? I know you need two assigners. it could be 2 separate impls classes: one for split timestamp extractor and another for record timestamp extractor ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/IcebergTimestampEventTimeExtractor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import java.io.Serializable; +import java.util.Comparator; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * {@link IcebergEventTimeExtractor} implementation which uses an Iceberg timestamp column to get + * the watermarks and the event times for the {@link RowData} read by the reader function. + */ +public class IcebergTimestampEventTimeExtractor + implements IcebergEventTimeExtractor<RowData>, Serializable { + private final int tsFieldId; + private final int tsFieldPos; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param tsFieldName The timestamp column which should be used as an event time + */ + public IcebergTimestampEventTimeExtractor(Schema schema, String tsFieldName) { + Types.NestedField field = schema.findField(tsFieldName); + Preconditions.checkArgument( + field.type().typeId().equals(Type.TypeID.TIMESTAMP), "Type should be timestamp"); + this.tsFieldId = field.fieldId(); + this.tsFieldPos = FlinkSchemaUtil.convert(schema).getFieldIndex(tsFieldName); Review Comment: `Schema#findField(name)` works with nested field. it seems that `RowType#getFieldIndex(name)` only supports top level column. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/EventTimeExtractorRecordEmitter.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; Review Comment: do we need a separate `eventtimeextractor` package? should these classes just be put into the `reader` package? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/IcebergTimestampEventTimeExtractor.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import java.io.Serializable; +import java.util.Comparator; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * {@link IcebergEventTimeExtractor} implementation which uses an Iceberg timestamp column to get + * the watermarks and the event times for the {@link RowData} read by the reader function. + */ +public class IcebergTimestampEventTimeExtractor + implements IcebergEventTimeExtractor<RowData>, Serializable { + private final int tsFieldId; + private final int tsFieldPos; + + /** + * Creates the extractor. + * + * @param schema The schema of the Table + * @param tsFieldName The timestamp column which should be used as an event time + */ + public IcebergTimestampEventTimeExtractor(Schema schema, String tsFieldName) { + Types.NestedField field = schema.findField(tsFieldName); + Preconditions.checkArgument( + field.type().typeId().equals(Type.TypeID.TIMESTAMP), "Type should be timestamp"); + this.tsFieldId = field.fieldId(); + this.tsFieldPos = FlinkSchemaUtil.convert(schema).getFieldIndex(tsFieldName); Review Comment: we can potentially use `RowDataProjection` with the position hierarchy. or we can also consider Flink's `org.apache.flink.table.connector.Projection` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/EventTimeExtractorRecordEmitter.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.reader.RecordAndPosition; +import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the record with event time and updates the split position. + * + * <p>The Emitter also emits watermarks at the beginning of every split, and sets the event + * timestamp based on the provided {@link IcebergEventTimeExtractor}. + */ +public final class EventTimeExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> { + private static final Logger LOG = LoggerFactory.getLogger(EventTimeExtractorRecordEmitter.class); + private final IcebergEventTimeExtractor timeExtractor; + private String lastSplit = null; + private long watermark; + + public EventTimeExtractorRecordEmitter(IcebergEventTimeExtractor timeExtractor) { + this.timeExtractor = timeExtractor; + } + + @Override + public void emitRecord( + RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { + if (!split.splitId().equals(lastSplit)) { + long newWatermark = timeExtractor.extractWatermark(split); + if (newWatermark < watermark) { + LOG.warn( + "Watermark decreased. PreviousWM {}, currentWM {}, previousSplit {}, currentSplit {}.", + watermark, + newWatermark, + lastSplit, + split.splitId()); + } + watermark = newWatermark; + output.emitWatermark(new Watermark(watermark)); + lastSplit = split.splitId(); + } + + long eventTime = timeExtractor.extractEventTime(element.record()); + if (eventTime <= watermark) { + LOG.warn( + "Late event arrived. PreviousWM {}, split {}, eventTime {}, record {}.", + watermark, + split, + eventTime, + element.record()); + } + + output.collect(element.record(), eventTime); Review Comment: I actually don't think Iceberg source should use the `collect` method, as Iceberg record/row doesn't carry inherent timestamp metadata (like Kafka records may). Iceberg source only needs the split timestamp/watermark extractor. ``` /** * Emit a record with a timestamp. * * <p>Use this method if the source system has timestamps attached to records. Typical examples * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp * with each event. * * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to * either use this source-provided timestamp, or replace it with a timestamp stored within the * event (for example if the event was a JSON object one could configure aTimestampAssigner that * extracts one of the object's fields and uses that as a timestamp). * * @param record the record to emit. * @param timestamp the timestamp of the record. */ void collect(T record, long timestamp); ``` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/IcebergEventTimeExtractor.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import java.io.Serializable; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +/** The interface used to extract watermarks and event timestamps from splits and records. */ +public interface IcebergEventTimeExtractor<T> extends Serializable { Review Comment: Flink `TimestampAssigner` also has a nice Javadoc defines the behavior where timestamp is not available. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/EventTimeExtractorRecordEmitter.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.reader.RecordAndPosition; +import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the record with event time and updates the split position. + * + * <p>The Emitter also emits watermarks at the beginning of every split, and sets the event + * timestamp based on the provided {@link IcebergEventTimeExtractor}. + */ +public final class EventTimeExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> { + private static final Logger LOG = LoggerFactory.getLogger(EventTimeExtractorRecordEmitter.class); + private final IcebergEventTimeExtractor timeExtractor; + private String lastSplit = null; + private long watermark; + + public EventTimeExtractorRecordEmitter(IcebergEventTimeExtractor timeExtractor) { + this.timeExtractor = timeExtractor; + } + + @Override + public void emitRecord( + RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { + if (!split.splitId().equals(lastSplit)) { + long newWatermark = timeExtractor.extractWatermark(split); + if (newWatermark < watermark) { + LOG.warn( + "Watermark decreased. PreviousWM {}, currentWM {}, previousSplit {}, currentSplit {}.", + watermark, + newWatermark, + lastSplit, + split.splitId()); + } + watermark = newWatermark; + output.emitWatermark(new Watermark(watermark)); + lastSplit = split.splitId(); + } + + long eventTime = timeExtractor.extractEventTime(element.record()); + if (eventTime <= watermark) { + LOG.warn( Review Comment: this should not be `warn` level. maybe trace level. I am actually not sure we need to log it. if we really want sth here, it can be a counter metric ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordEmitters.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.iceberg.flink.source.eventtimeextractor.EventTimeExtractorRecordEmitter; +import org.apache.iceberg.flink.source.eventtimeextractor.IcebergEventTimeExtractor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides implementations of {@link SerializableRecordEmitter} which could be used for emitting + * records from an Iceberg split. These are used by the {@link IcebergSourceReader} + */ +public class RecordEmitters { + private static final Logger LOG = LoggerFactory.getLogger(RecordEmitters.class); + + private RecordEmitters() {} + + public static <T> SerializableRecordEmitter<T> emitter() { Review Comment: typically factory class would expose those factory methods with some specific names like `defaultEmitter` or `eventTimeEmitter` ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/eventtimeextractor/EventTimeExtractorRecordEmitter.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.eventtimeextractor; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.iceberg.flink.source.reader.RecordAndPosition; +import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Emitter which emits the record with event time and updates the split position. + * + * <p>The Emitter also emits watermarks at the beginning of every split, and sets the event + * timestamp based on the provided {@link IcebergEventTimeExtractor}. + */ +public final class EventTimeExtractorRecordEmitter<T> implements SerializableRecordEmitter<T> { Review Comment: does this need to be public? at least annotate with `@Internal` ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverEventTimeExtractor.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.eventtimeextractor.IcebergTimestampEventTimeExtractor; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.util.StructLikeWrapper; + +public class TestIcebergSourceFailoverEventTimeExtractor extends TestIcebergSourceFailover { + // Increment ts by 60 minutes for each generateRecords batch Review Comment: mismatch: comment has 60 mins while code has 15 mins ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java: ########## @@ -56,4 +57,20 @@ public static SerializableComparator<IcebergSourceSplit> fileSequenceNumber() { } }; } + + /** Comparator which orders the splits based on watermark of the splits */ + public static SerializableComparator<IcebergSourceSplit> watermarkComparator( Review Comment: here I think separate split and record timestamp extracts can be cleaner. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordEmitterFactory.java: ########## @@ -18,19 +18,14 @@ */ package org.apache.iceberg.flink.source.reader; -import org.apache.flink.api.connector.source.SourceOutput; +import java.io.Serializable; import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; -final class IcebergSourceRecordEmitter<T> - implements RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> { - - IcebergSourceRecordEmitter() {} - - @Override - public void emitRecord( - RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) { - output.collect(element.record()); - split.updatePosition(element.fileOffset(), element.recordOffset()); - } +/** + * Factory defining which {@link org.apache.flink.connector.base.source.reader.RecordEmitter} + * implementation to use. + */ +public interface RecordEmitterFactory<T> extends Serializable { + RecordEmitter<RecordAndPosition<T>, T, IcebergSourceSplit> emitter(); Review Comment: nit: most factory interface/class in Iceberg seems to have the style `newEmitter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
