stevenzwu commented on code in PR #7661: URL: https://github.com/apache/iceberg/pull/7661#discussion_r1223342802
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitComparators.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Provides implementations of {@link org.apache.iceberg.flink.source.split.SerializableComparator} + * which could be used for ordering splits. These are used by the {@link + * org.apache.iceberg.flink.source.assigner.OrderedSplitAssignerFactory} and the {@link + * org.apache.iceberg.flink.source.reader.IcebergSourceReader} + */ +public class SplitComparators { + private SplitComparators() {} + + /** Comparator which orders the splits based on the file sequence number of the data files */ + public static SerializableComparator<IcebergSourceSplit> fileSequenceNumber() { + return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> { Review Comment: I thought anonymous class is not serializable. can we add a unit test to confirm? ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/FileSequenceNumberBasedComparator.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class FileSequenceNumberBasedComparator + implements SerializableComparator<IcebergSourceSplit> { + @Override + public int compare(IcebergSourceSplit o1, IcebergSourceSplit o2) { + Preconditions.checkArgument( + o1.task().files().size() == 1 && o2.task().files().size() == 1, + "Could not compare combined task. Please use 'split-open-file-cost' to prevent combining multiple files to a split"); + + Long seq1 = o1.task().files().iterator().next().file().fileSequenceNumber(); + Long seq2 = o2.task().files().iterator().next().file().fileSequenceNumber(); + + Preconditions.checkNotNull( + seq1, "Invalid file sequence number for {}. Only V2 table format is supported", o1); Review Comment: agree with what you side regarding which split causing the issue. but we will need to call out what exactly is invalid (`null` in this case). maybe print the split in the end of the formatted string ``` Invalid file sequence number: null. Doesn't support splits written with V1 format: %s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
