JingsongLi commented on code in PR #111:
URL: https://github.com/apache/flink-table-store/pull/111#discussion_r869075055


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -122,6 +132,17 @@ public SplitEnumerator<FileStoreSourceSplit, 
PendingSplitsCheckpoint> restoreEnu
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         FileStoreScan scan = fileStore.newScan();
+        Long snapshotId;

Review Comment:
   revert these two fields?
   They should be in below.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java:
##########
@@ -61,6 +63,12 @@
 
     @Nullable private final Predicate fieldPredicate;
 
+    /**
+     * The partitioned manifest meta collected at planning phase when manual 
compaction is
+     * triggered.
+     */
+    @Nullable private final PartitionedManifestMeta 
specifiedPartitionedManifestMeta;

Review Comment:
   just a shorter field name? `specifiedPartManifests`?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceTest.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.PartitionedManifestMeta;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FileStoreSource}. */
+public class FileStoreSourceTest {
+
+    private static final RowType RECORD_TYPE =
+            RowType.of(
+                    new LogicalType[] {
+                        new IntType(), new VarCharType(), new DoubleType(), 
new VarCharType()
+                    },
+                    new String[] {"k0", "k1", "v0", "v1"});
+
+    @MethodSource("parameters")
+    @ParameterizedTest
+    public void testSerDe(boolean hasPk, boolean partitioned, boolean 
specified)
+            throws ClassNotFoundException, IOException {
+        FileStore fileStore = buildFileStore(hasPk, partitioned);
+        Long specifiedSnapshotId = specified ? 1L : null;
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
specifiedManifestEntries =
+                specified ? buildManifestEntries(hasPk, partitioned) : null;
+        PartitionedManifestMeta partitionedManifestMeta =
+                specified
+                        ? new PartitionedManifestMeta(
+                                specifiedSnapshotId,
+                                specifiedManifestEntries,
+                                getPartitionType(partitioned).getFieldCount(),
+                                getKeyType(hasPk),
+                                getValueType(hasPk))
+                        : null;
+        FileStoreSource source =
+                new FileStoreSource(
+                        fileStore,
+                        !hasPk,
+                        true,
+                        Duration.ofSeconds(1).toMillis(),
+                        true,
+                        null,
+                        null,
+                        null,
+                        partitionedManifestMeta);
+        Object object = readObject(writeObject(source));
+        assertThat(object).isInstanceOf(FileStoreSource.class);
+        FileStoreSource deserialized = (FileStoreSource) object;
+        
assertThat(deserialized.getBoundedness()).isEqualTo(source.getBoundedness());
+        if (specified) {
+            assertThat(deserialized.getSpecifiedPartitionedManifestMeta())
+                    .isEqualTo(source.getSpecifiedPartitionedManifestMeta());
+        } else {
+            
assertThat(deserialized.getSpecifiedPartitionedManifestMeta()).isNull();
+        }
+    }
+
+    private byte[] writeObject(FileStoreSource source) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(source);
+        oos.close();
+        return baos.toByteArray();
+    }
+
+    private Object readObject(byte[] bytes) throws IOException, 
ClassNotFoundException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        Object object = ois.readObject();
+        ois.close();
+        return object;
+    }
+
+    public static Stream<Arguments> parameters() {
+        // hasPk, partitioned, specified
+        return Stream.of(
+                Arguments.of(true, true, false),
+                Arguments.of(true, false, false),
+                Arguments.of(false, false, false),
+                Arguments.of(false, true, false),
+                Arguments.of(true, true, true),
+                Arguments.of(true, false, true),
+                Arguments.of(false, false, true),
+                Arguments.of(false, true, true));
+    }
+
+    private static FileStore buildFileStore(boolean hasPk, boolean 
partitioned) {
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat", "db", 
"tbl");
+        String user = "user";
+        RowType partitionType = getPartitionType(partitioned);
+        RowType keyType = getKeyType(hasPk);
+        RowType valueType = getValueType(hasPk);
+        MergeFunction mergeFunction =
+                hasPk ? new DeduplicateMergeFunction() : new 
ValueCountMergeFunction();
+        return new FileStoreImpl(
+                tableIdentifier,
+                new Configuration(),
+                user,
+                partitionType,
+                keyType,
+                valueType,
+                mergeFunction);
+    }
+
+    private static Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
buildManifestEntries(
+            boolean hasPk, boolean partitioned) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> manifestEntries = 
new HashMap<>();
+        Map<Integer, List<DataFileMeta>> bucketEntries = new HashMap<>();
+        int totalBuckets = new Random().nextInt(10) + 1;
+        IntStream.range(0, totalBuckets)
+                .forEach(
+                        bucket ->
+                                bucketEntries.put(
+                                        bucket,
+                                        genMinMax(hasPk).stream()
+                                                .map(
+                                                        tuple -> {

Review Comment:
   This format looks difficult, can we switch to using foreach?
   It is not recommended to use too complex logic in lambda.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/PartitionedManifestMeta.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Manifest entries per partitioned with the corresponding snapshot id. */
+public class PartitionedManifestMeta implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The latest snapshot id seen at planning phase when manual compaction 
is triggered. */
+    private final Long snapshotId;
+
+    /** The manifest entries collected at planning phase when manual 
compaction is triggered. */
+    private transient Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
manifestEntries;
+
+    private final int partFieldCount;
+    private final RowType keyType;
+    private final RowType valueType;
+
+    public PartitionedManifestMeta(
+            Long snapshotId,
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
specifiedManifestEntries,
+            int partFieldCount,
+            RowType keyType,
+            RowType valueType) {
+        Preconditions.checkNotNull(snapshotId, "Specified snapshot should not 
be null.");
+        Preconditions.checkNotNull(
+                specifiedManifestEntries, "Specified manifest entries should 
not be null.");
+        this.snapshotId = snapshotId;
+        this.manifestEntries = specifiedManifestEntries;
+        this.partFieldCount = partFieldCount;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    public Long getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
getManifestEntries() {
+        return manifestEntries;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+
+        BinaryRowDataSerializer partSerializer = new 
BinaryRowDataSerializer(partFieldCount);
+        DataFileMetaSerializer metaSerializer = new 
DataFileMetaSerializer(keyType, valueType);
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        view.writeInt(manifestEntries.size());
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                manifestEntries.entrySet()) {
+            partSerializer.serialize(partEntry.getKey(), view);
+            Map<Integer, List<DataFileMeta>> bucketEntry = 
partEntry.getValue();
+            view.writeInt(bucketEntry.size());
+            for (Map.Entry<Integer, List<DataFileMeta>> entry : 
bucketEntry.entrySet()) {
+                view.writeInt(entry.getKey());
+                view.writeInt(entry.getValue().size());
+                for (DataFileMeta meta : entry.getValue()) {
+                    metaSerializer.serialize(meta, view);
+                }
+            }
+        }
+    }
+
+    private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOException {
+        in.defaultReadObject();
+
+        manifestEntries = new HashMap<>();
+        BinaryRowDataSerializer partSerializer = new 
BinaryRowDataSerializer(partFieldCount);
+        DataFileMetaSerializer metaSerializer = new 
DataFileMetaSerializer(keyType, valueType);
+        DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(in);
+        int partitionNum = view.readInt();
+        while (partitionNum > 0) {

Review Comment:
   Rather than maintaining a `partitionNum` variable, I find the `for (int i = 
0....)` is more straightforward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to