LadyForest commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r893749689


##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +186,11 @@ public void onDropTable(Context context, boolean 
ignoreIfNotExists) {
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
-        throw new UnsupportedOperationException("Not implement yet");
+        Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
+        newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), 
String.valueOf(true));
+        newOptions.put(
+                COMPACTION_PARTITION_SPEC.key(),
+                JsonSerdeUtil.toJson(catalogPartitionSpec.getPartitionSpec()));

Review Comment:
   > Compact without partition is not supported now?
   
   For partitioned table, both `ALTER TABLE table_identifier COMPACT` and 
`ALTER TABLE table_identifier PARTITION (part_spec) COMPACT` are supported. If 
the user specifies a partition to compact, we need to keep track of that info 
in the options. Otherwise, `StoreSinkCompactor` cannot know which partition 
needs to be compacted at runtime.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to