JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884379183


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
             WriteMode writeMode,
             RowType keyType,
             RowType valueType,
-            Comparator<RowData> keyComparator,
+            Supplier<Comparator<RowData>> keyComparatorSupplier,

Review Comment:
   Why a util method will modify `Comparator` to `Supplier`?
   We shouldn't create it repeatedly unless there is a thread safety issue here.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        
catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = 
plan.groupByPartFiles();
+        if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new 
FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), 
groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new 
HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+                    partEntry.getValue().entrySet()) {
+                List<DataFileMeta> smallFiles =
+                        bucketEntry.getValue().stream()
+                                .filter(fileMeta -> fileMeta.fileSize() < 
options.targetFileSize)
+                                .collect(Collectors.toList());
+                List<DataFileMeta> intersectedFiles =
+                        new IntervalPartition(bucketEntry.getValue(), 
keyComparator)
+                                .partition().stream()
+                                        .filter(section -> section.size() > 1)
+                                        .flatMap(Collection::stream)
+                                        .map(SortedRun::files)
+                                        .flatMap(Collection::stream)
+                                        .collect(Collectors.toList());
+
+                List<DataFileMeta> filteredFiles =

Review Comment:
   `bucketFiles`?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
         }
     }
 
+    @Test
+    public void testOnCompactTableForNoSnapshot() {
+        RowType partType = RowType.of();
+        MockTableStoreManagedFactory mockTableStoreManagedFactory =
+                new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE);
+        prepare(
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                true);
+        assertThatThrownBy(
+                        () ->
+                                mockTableStoreManagedFactory.onCompactTable(
+                                        context, new 
CatalogPartitionSpec(emptyMap())))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("The specified table to compact does not 
exist any snapshot");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForNonPartitioned(boolean rescaleBucket) 
throws Exception {
+        RowType partType = RowType.of();
+        runTest(
+                new MockTableStoreManagedFactory(partType, 
NON_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                partType,
+                NON_PARTITIONED_ROW_TYPE,
+                NON_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket) 
throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(
+                        SINGLE_PARTITIONED_PART_TYPE, 
SINGLE_PARTITIONED_ROW_TYPE),
+                TABLE + "_" + UUID.randomUUID(),
+                SINGLE_PARTITIONED_PART_TYPE,
+                SINGLE_PARTITIONED_ROW_TYPE,
+                SINGLE_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket) 
throws Exception {
+        runTest(
+                new MockTableStoreManagedFactory(),
+                TABLE + "_" + UUID.randomUUID(),
+                DEFAULT_PART_TYPE,
+                DEFAULT_ROW_TYPE,
+                MULTI_PARTITIONED,
+                rescaleBucket);
+    }
+
+    @MethodSource("provideManifest")
+    @ParameterizedTest

Review Comment:
   Can we have a name for `ParameterizedTest`?
   Very difficult to maintain without a name.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        
catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = 
plan.groupByPartFiles();
+        if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new 
FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), 
groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+            MergeTreeOptions options,
+            Comparator<RowData> keyComparator) {
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new 
HashMap<>();
+
+        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
+                groupBy.entrySet()) {
+            Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();

Review Comment:
   Maybe a better name: `partFiles`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##########
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType 
partitionType, String defaultPart
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
-        String[] partitionColumns = partitionType.getFieldNames().toArray(new 
String[0]);
-        this.partitionComputer =
-                new RowDataPartitionComputer(
-                        defaultPartValue,
-                        partitionColumns,
-                        partitionType.getFields().stream()
-                                .map(f -> 
LogicalTypeDataTypeConverter.toDataType(f.getType()))
-                                .toArray(DataType[]::new),
-                        partitionColumns);
+        this.partitionComputer = getPartitionComputer(partitionType, 
defaultPartValue);
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
     }
 
+    public static RowDataPartitionComputer getPartitionComputer(

Review Comment:
   Just for test?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        FileStore fileStore = buildTableStore(context).buildFileStore();
+        FileStoreScan.Plan plan =
+                fileStore
+                        .newScan()
+                        .withPartitionFilter(
+                                PredicateConverter.CONVERTER.fromMap(
+                                        
catalogPartitionSpec.getPartitionSpec(),
+                                        fileStore.partitionType()))
+                        .plan();
+
+        Preconditions.checkState(
+                plan.snapshotId() != null && !plan.files().isEmpty(),
+                "The specified %s to compact does not exist any snapshot",
+                catalogPartitionSpec.getPartitionSpec().isEmpty()
+                        ? "table"
+                        : String.format("partition %s", 
catalogPartitionSpec.getPartitionSpec()));
+        Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = 
plan.groupByPartFiles();
+        if 
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+            groupBy =
+                    pickManifest(
+                            groupBy,
+                            new 
FileStoreOptions(Configuration.fromMap(newOptions))
+                                    .mergeTreeOptions(),
+                            new 
KeyComparatorSupplier(fileStore.partitionType()).get());
+        }
+        try {
+            newOptions.put(
+                    COMPACTION_SCANNED_MANIFEST.key(),
+                    Base64.getEncoder()
+                            .encodeToString(
+                                    InstantiationUtil.serializeObject(
+                                            new PartitionedManifestMeta(
+                                                    plan.snapshotId(), 
groupBy))));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return newOptions;
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(

Review Comment:
   You have picked files here, but how to make sure that writer will compact 
these files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to