leaves12138 commented on code in PR #2457:
URL: https://github.com/apache/incubator-paimon/pull/2457#discussion_r1418405072
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java:
##########
@@ -124,28 +148,155 @@ private boolean execute(
FileStoreTable table,
String sortType,
List<String> sortColumns,
- @Nullable String filter) {
- CoreOptions coreOptions = table.store().options();
-
- // sort only works with bucket=-1 yet
- if
(!TableSorter.OrderType.of(sortType).equals(TableSorter.OrderType.NONE)) {
- if (!(table instanceof AppendOnlyFileStoreTable) ||
coreOptions.bucket() != -1) {
- throw new UnsupportedOperationException(
- "Spark compact with sort_type "
- + sortType
- + " only support unaware-bucket append-only
table yet.");
+ @Nullable String partitions) {
+ table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
+ BucketMode bucketMode = table.bucketMode();
+ TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
+
+ if (orderType.equals(TableSorter.OrderType.NONE)) {
+ JavaSparkContext javaSparkContext = new
JavaSparkContext(spark().sparkContext());
+ Predicate filter =
+ StringUtils.isBlank(partitions)
+ ? null
+ : ParameterUtils.getPartitionFilter(
+ ParameterUtils.getPartitions(partitions),
table.rowType());
+ switch (bucketMode) {
+ case FIXED:
+ case DYNAMIC:
+ compactAwareBucketTable(table, filter, javaSparkContext);
+ break;
+ case UNAWARE:
+ compactUnAwareBucketTable(table, filter, javaSparkContext);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Spark compact with " + bucketMode + " is not
support yet.");
+ }
+ } else {
+ switch (bucketMode) {
+ case UNAWARE:
+ sortCompactUnAwareBucketTable(table, orderType,
sortColumns, partitions);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Spark compact with sort_type "
+ + sortType
+ + " only support unaware-bucket
append-only table yet.");
+ }
+ }
+ return true;
+ }
+
+ private void compactAwareBucketTable(
+ FileStoreTable table, @Nullable Predicate filter, JavaSparkContext
javaSparkContext) {
+ InnerTableScan scan = table.newScan();
+ if (filter != null) {
+ scan.withFilter(filter);
+ }
+
+ List<Split> splits = scan.plan().splits();
+ if (splits.isEmpty()) {
+ return;
+ }
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ JavaRDD<CommitMessage> commitMessageJavaRDD =
+ javaSparkContext
+ .parallelize(splits)
+ .mapPartitions(
+ (FlatMapFunction<Iterator<Split>,
CommitMessage>)
+ splitIterator -> {
+ IOManager ioManager =
SparkUtils.createIOManager();
+ BatchTableWrite write =
writeBuilder.newWrite();
+ write.withIOManager(ioManager);
Review Comment:
If one partition bucket have two splits, this may cause an error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]