leaves12138 commented on code in PR #2457:
URL: https://github.com/apache/incubator-paimon/pull/2457#discussion_r1418405072


##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java:
##########
@@ -124,28 +148,155 @@ private boolean execute(
             FileStoreTable table,
             String sortType,
             List<String> sortColumns,
-            @Nullable String filter) {
-        CoreOptions coreOptions = table.store().options();
-
-        // sort only works with bucket=-1 yet
-        if 
(!TableSorter.OrderType.of(sortType).equals(TableSorter.OrderType.NONE)) {
-            if (!(table instanceof AppendOnlyFileStoreTable) || 
coreOptions.bucket() != -1) {
-                throw new UnsupportedOperationException(
-                        "Spark compact with sort_type "
-                                + sortType
-                                + " only support unaware-bucket append-only 
table yet.");
+            @Nullable String partitions) {
+        table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+        BucketMode bucketMode = table.bucketMode();
+        TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
+
+        if (orderType.equals(TableSorter.OrderType.NONE)) {
+            JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
+            Predicate filter =
+                    StringUtils.isBlank(partitions)
+                            ? null
+                            : ParameterUtils.getPartitionFilter(
+                                    ParameterUtils.getPartitions(partitions), 
table.rowType());
+            switch (bucketMode) {
+                case FIXED:
+                case DYNAMIC:
+                    compactAwareBucketTable(table, filter, javaSparkContext);
+                    break;
+                case UNAWARE:
+                    compactUnAwareBucketTable(table, filter, javaSparkContext);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Spark compact with " + bucketMode + " is not 
support yet.");
+            }
+        } else {
+            switch (bucketMode) {
+                case UNAWARE:
+                    sortCompactUnAwareBucketTable(table, orderType, 
sortColumns, partitions);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Spark compact with sort_type "
+                                    + sortType
+                                    + " only support unaware-bucket 
append-only table yet.");
+            }
+        }
+        return true;
+    }
+
+    private void compactAwareBucketTable(
+            FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
+        InnerTableScan scan = table.newScan();
+        if (filter != null) {
+            scan.withFilter(filter);
+        }
+
+        List<Split> splits = scan.plan().splits();
+        if (splits.isEmpty()) {
+            return;
+        }
+
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        JavaRDD<CommitMessage> commitMessageJavaRDD =
+                javaSparkContext
+                        .parallelize(splits)
+                        .mapPartitions(
+                                (FlatMapFunction<Iterator<Split>, 
CommitMessage>)
+                                        splitIterator -> {
+                                            IOManager ioManager = 
SparkUtils.createIOManager();
+                                            BatchTableWrite write = 
writeBuilder.newWrite();
+                                            write.withIOManager(ioManager);

Review Comment:
   If one partition bucket have two splits, this may cause an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to