openinx commented on a change in pull request #3501: URL: https://github.com/apache/iceberg/pull/3501#discussion_r745401341
########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; Review comment: The class `org.apache.iceberg.flink.Position` is not put in the correct package `org.apache.iceberg.flink.source` ? ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private final CombinedScanTask task; + /** + * Position field is mutable + */ + @Nullable + private final Position position; + + /** + * The splits are frequently serialized into checkpoints. + * Caching the byte representation makes repeated serialization cheap. + */ + @Nullable + private transient byte[] serializedFormCache; + + public IcebergSourceSplit(CombinedScanTask task, Position position) { + this.task = task; + this.position = position; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, new Position(fileOffset, recordOffset)); + } + + public CombinedScanTask task() { + return task; + } + + public Position position() { + return position; + } + + byte[] serializedFormCache() { + return serializedFormCache; + } + + void serializedFormCache(byte[] cachedBytes) { + this.serializedFormCache = cachedBytes; + } + + @Override + public String splitId() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .toString(); + } + + public void updatePosition(int newFileOffset, long newRecordOffset) { + position.update(newFileOffset, newRecordOffset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergSourceSplit split = (IcebergSourceSplit) o; + return Objects.equal(splitId(), split.splitId()) && + Objects.equal(position, split.position()); + } + + @Override + public int hashCode() { + return Objects.hashCode(splitId()); Review comment: Using the value of `toString()` to calculate the hasCode is not a good practice because if some `FileScanTask` implementation did not implement the correct `toString()`, it will use the instance's hashCode as its `toString()` value. That leads to the meaningless of comparing two `IcebergSourceSplit`. ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; Review comment: When I build the module by using commaned`./gradlew iceberg-flink:iceberg-flink-1.13:build -x test`. it will report the error message: ``` > Task :iceberg-flink:iceberg-flink-1.13:checkstyleMain [ant:checkstyle] [ERROR] /Users/openinx/software/apache-iceberg/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java:20:1: Package name is not same as directory. [PackageDeclaration] > Task :iceberg-flink:iceberg-flink-1.13:checkstyleMain FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':iceberg-flink:iceberg-flink-1.13:checkstyleMain'. > Checkstyle rule violations were found. See the report at: file:///Users/openinx/software/apache-iceberg/flink/v1.13/flink/build/reports/checkstyle/main.html Checkstyle files with violations: 1 Checkstyle violations by severity: [error:1] ``` It's strange that the apache iceberg travis CI did not report this issue. I think I need to take a look what's wrong. ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private final CombinedScanTask task; + /** + * Position field is mutable + */ + @Nullable + private final Position position; + + /** + * The splits are frequently serialized into checkpoints. + * Caching the byte representation makes repeated serialization cheap. + */ + @Nullable + private transient byte[] serializedFormCache; + + public IcebergSourceSplit(CombinedScanTask task, Position position) { + this.task = task; + this.position = position; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, new Position(fileOffset, recordOffset)); + } + + public CombinedScanTask task() { + return task; + } + + public Position position() { + return position; + } + + byte[] serializedFormCache() { + return serializedFormCache; + } + + void serializedFormCache(byte[] cachedBytes) { + this.serializedFormCache = cachedBytes; + } + + @Override + public String splitId() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .toString(); + } + + public void updatePosition(int newFileOffset, long newRecordOffset) { + position.update(newFileOffset, newRecordOffset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergSourceSplit split = (IcebergSourceSplit) o; + return Objects.equal(splitId(), split.splitId()) && + Objects.equal(position, split.position()); + } + + @Override + public int hashCode() { + return Objects.hashCode(splitId()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .add("position", position) + .toString(); + } + + private String toString(Collection<FileScanTask> files) { + return Iterables.toString(files.stream().map(fileScanTask -> + MoreObjects.toStringHelper(fileScanTask) + .add("file", fileScanTask.file() != null ? + fileScanTask.file().path().toString() : + "NoFile") Review comment: I think the `FileScanTask#file()` is guaranteed to provide a non-null value, otherwise the `start()` and `length()` won't have any meaning. ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private final CombinedScanTask task; + /** + * Position field is mutable + */ + @Nullable + private final Position position; + + /** + * The splits are frequently serialized into checkpoints. + * Caching the byte representation makes repeated serialization cheap. + */ + @Nullable + private transient byte[] serializedFormCache; + + public IcebergSourceSplit(CombinedScanTask task, Position position) { + this.task = task; + this.position = position; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, new Position(fileOffset, recordOffset)); + } + + public CombinedScanTask task() { + return task; + } + + public Position position() { + return position; + } + + byte[] serializedFormCache() { + return serializedFormCache; + } + + void serializedFormCache(byte[] cachedBytes) { + this.serializedFormCache = cachedBytes; + } + + @Override + public String splitId() { + return MoreObjects.toStringHelper(this) + .add("files", toString(task.files())) + .toString(); + } + + public void updatePosition(int newFileOffset, long newRecordOffset) { + position.update(newFileOffset, newRecordOffset); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IcebergSourceSplit split = (IcebergSourceSplit) o; + return Objects.equal(splitId(), split.splitId()) && + Objects.equal(position, split.position()); + } + + @Override + public int hashCode() { + return Objects.hashCode(splitId()); Review comment: As we have the cached `serializedFormCache`, why not use the lazy approach to get the serialized bytes and calculate its byte array's hashCode ? ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +import java.io.Serializable; +import java.util.Collection; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.flink.source.Position; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; + +@Internal +public class IcebergSourceSplit implements SourceSplit, Serializable { + private final CombinedScanTask task; + /** + * Position field is mutable + */ + @Nullable + private final Position position; + + /** + * The splits are frequently serialized into checkpoints. + * Caching the byte representation makes repeated serialization cheap. + */ + @Nullable + private transient byte[] serializedFormCache; + + public IcebergSourceSplit(CombinedScanTask task, Position position) { + this.task = task; + this.position = position; + } + + public static IcebergSourceSplit fromCombinedScanTask(CombinedScanTask combinedScanTask) { + return fromCombinedScanTask(combinedScanTask, 0, 0L); + } + + public static IcebergSourceSplit fromCombinedScanTask( + CombinedScanTask combinedScanTask, int fileOffset, long recordOffset) { + return new IcebergSourceSplit(combinedScanTask, new Position(fileOffset, recordOffset)); + } + + public CombinedScanTask task() { + return task; + } + + public Position position() { + return position; + } + + byte[] serializedFormCache() { + return serializedFormCache; + } + + void serializedFormCache(byte[] cachedBytes) { + this.serializedFormCache = cachedBytes; + } Review comment: Why not use the lazily approach to serialize the split which is similar to the Schema. https://github.com/apache/iceberg/blob/1d8164383baf0e343ade13b21ed0e4d4d02c56cd/api/src/main/java/org/apache/iceberg/Schema.java#L137 Setting the cachedBytes looks a bit strange for me. ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; Review comment: After read this [PR](https://github.com/apache/iceberg/pull/685), I think it's reasonable to disable the baseline check when running the unit tests in local IDE, but it does not make sense to disable it in github travis because it will lead us to merging the PR that introduced basic checkstyle issues. FYI @rdblue ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitStatus.java ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.split; + +public enum IcebergSourceSplitStatus { Review comment: Is this class related to this source split ? ########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; Review comment: When I copied the command from flink-ci.yaml and run it in command line, it work fine without reporting any errors : ```bash ./gradlew -DsparkVersions= -DhiveVersions= -DflinkVersions=1.13 :iceberg-flink:iceberg-flink-1.13:check :iceberg-flink:iceberg-flink-1.13-runtime:check -Pquick=true -x javadoc -x test ``` But once we removed the `-Pquick=true`, it will detect the checkstyle issue: ``` ./gradlew -DsparkVersions= -DhiveVersions= -DflinkVersions=1.13 :iceberg-flink:iceberg-flink-1.13:check :iceberg-flink:iceberg-flink-1.13-runtime:check -x javadoc -x test > Task :iceberg-flink:iceberg-flink-1.13:checkstyleMain [ant:checkstyle] [ERROR] /Users/openinx/software/apache-iceberg/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/Position.java:20:1: Package name is not same as directory. [PackageDeclaration] > Task :iceberg-flink:iceberg-flink-1.13:checkstyleMain FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':iceberg-flink:iceberg-flink-1.13:checkstyleMain'. > Checkstyle rule violations were found. See the report at: file:///Users/openinx/software/apache-iceberg/flink/v1.13/flink/build/reports/checkstyle/main.html Checkstyle files with violations: 1 Checkstyle violations by severity: [error:1] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
