JingsongLi commented on code in PR #138:
URL: https://github.com/apache/flink-table-store/pull/138#discussion_r884379183
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java:
##########
@@ -55,14 +56,14 @@ public FileStoreReadImpl(
WriteMode writeMode,
RowType keyType,
RowType valueType,
- Comparator<RowData> keyComparator,
+ Supplier<Comparator<RowData>> keyComparatorSupplier,
Review Comment:
Why a util method will modify `Comparator` to `Supplier`?
We shouldn't create it repeatedly unless there is a thread safety issue here.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+ fileStore.partitionType()))
+ .plan();
+
+ Preconditions.checkState(
+ plan.snapshotId() != null && !plan.files().isEmpty(),
+ "The specified %s to compact does not exist any snapshot",
+ catalogPartitionSpec.getPartitionSpec().isEmpty()
+ ? "table"
+ : String.format("partition %s",
catalogPartitionSpec.getPartitionSpec()));
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+ if
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ groupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ new
KeyComparatorSupplier(fileStore.partitionType()).get());
+ }
+ try {
+ newOptions.put(
+ COMPACTION_SCANNED_MANIFEST.key(),
+ Base64.getEncoder()
+ .encodeToString(
+ InstantiationUtil.serializeObject(
+ new PartitionedManifestMeta(
+ plan.snapshotId(),
groupBy))));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return newOptions;
+ }
+
+ @VisibleForTesting
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+ MergeTreeOptions options,
+ Comparator<RowData> keyComparator) {
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new
HashMap<>();
+
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
+ groupBy.entrySet()) {
+ Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
+ for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
+ partEntry.getValue().entrySet()) {
+ List<DataFileMeta> smallFiles =
+ bucketEntry.getValue().stream()
+ .filter(fileMeta -> fileMeta.fileSize() <
options.targetFileSize)
+ .collect(Collectors.toList());
+ List<DataFileMeta> intersectedFiles =
+ new IntervalPartition(bucketEntry.getValue(),
keyComparator)
+ .partition().stream()
+ .filter(section -> section.size() > 1)
+ .flatMap(Collection::stream)
+ .map(SortedRun::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ List<DataFileMeta> filteredFiles =
Review Comment:
`bucketFiles`?
##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java:
##########
@@ -256,9 +284,288 @@ public void testCreateAndCheckTableStore(
}
}
+ @Test
+ public void testOnCompactTableForNoSnapshot() {
+ RowType partType = RowType.of();
+ MockTableStoreManagedFactory mockTableStoreManagedFactory =
+ new MockTableStoreManagedFactory(partType,
NON_PARTITIONED_ROW_TYPE);
+ prepare(
+ TABLE + "_" + UUID.randomUUID(),
+ partType,
+ NON_PARTITIONED_ROW_TYPE,
+ NON_PARTITIONED,
+ true);
+ assertThatThrownBy(
+ () ->
+ mockTableStoreManagedFactory.onCompactTable(
+ context, new
CatalogPartitionSpec(emptyMap())))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("The specified table to compact does not
exist any snapshot");
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOnCompactTableForNonPartitioned(boolean rescaleBucket)
throws Exception {
+ RowType partType = RowType.of();
+ runTest(
+ new MockTableStoreManagedFactory(partType,
NON_PARTITIONED_ROW_TYPE),
+ TABLE + "_" + UUID.randomUUID(),
+ partType,
+ NON_PARTITIONED_ROW_TYPE,
+ NON_PARTITIONED,
+ rescaleBucket);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOnCompactTableForSinglePartitioned(boolean rescaleBucket)
throws Exception {
+ runTest(
+ new MockTableStoreManagedFactory(
+ SINGLE_PARTITIONED_PART_TYPE,
SINGLE_PARTITIONED_ROW_TYPE),
+ TABLE + "_" + UUID.randomUUID(),
+ SINGLE_PARTITIONED_PART_TYPE,
+ SINGLE_PARTITIONED_ROW_TYPE,
+ SINGLE_PARTITIONED,
+ rescaleBucket);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOnCompactTableForMultiPartitioned(boolean rescaleBucket)
throws Exception {
+ runTest(
+ new MockTableStoreManagedFactory(),
+ TABLE + "_" + UUID.randomUUID(),
+ DEFAULT_PART_TYPE,
+ DEFAULT_ROW_TYPE,
+ MULTI_PARTITIONED,
+ rescaleBucket);
+ }
+
+ @MethodSource("provideManifest")
+ @ParameterizedTest
Review Comment:
Can we have a name for `ParameterizedTest`?
Very difficult to maintain without a name.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+ fileStore.partitionType()))
+ .plan();
+
+ Preconditions.checkState(
+ plan.snapshotId() != null && !plan.files().isEmpty(),
+ "The specified %s to compact does not exist any snapshot",
+ catalogPartitionSpec.getPartitionSpec().isEmpty()
+ ? "table"
+ : String.format("partition %s",
catalogPartitionSpec.getPartitionSpec()));
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+ if
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ groupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ new
KeyComparatorSupplier(fileStore.partitionType()).get());
+ }
+ try {
+ newOptions.put(
+ COMPACTION_SCANNED_MANIFEST.key(),
+ Base64.getEncoder()
+ .encodeToString(
+ InstantiationUtil.serializeObject(
+ new PartitionedManifestMeta(
+ plan.snapshotId(),
groupBy))));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return newOptions;
+ }
+
+ @VisibleForTesting
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+ MergeTreeOptions options,
+ Comparator<RowData> keyComparator) {
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filtered = new
HashMap<>();
+
+ for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
+ groupBy.entrySet()) {
+ Map<Integer, List<DataFileMeta>> manifests = new HashMap<>();
Review Comment:
Maybe a better name: `partFiles`?
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java:
##########
@@ -61,20 +61,24 @@ public FileStorePathFactory(Path root, RowType
partitionType, String defaultPart
this.root = root;
this.uuid = UUID.randomUUID().toString();
- String[] partitionColumns = partitionType.getFieldNames().toArray(new
String[0]);
- this.partitionComputer =
- new RowDataPartitionComputer(
- defaultPartValue,
- partitionColumns,
- partitionType.getFields().stream()
- .map(f ->
LogicalTypeDataTypeConverter.toDataType(f.getType()))
- .toArray(DataType[]::new),
- partitionColumns);
+ this.partitionComputer = getPartitionComputer(partitionType,
defaultPartValue);
this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
}
+ public static RowDataPartitionComputer getPartitionComputer(
Review Comment:
Just for test?
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreManagedFactory.java:
##########
@@ -183,6 +204,84 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ FileStoreScan.Plan plan =
+ fileStore
+ .newScan()
+ .withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(
+
catalogPartitionSpec.getPartitionSpec(),
+ fileStore.partitionType()))
+ .plan();
+
+ Preconditions.checkState(
+ plan.snapshotId() != null && !plan.files().isEmpty(),
+ "The specified %s to compact does not exist any snapshot",
+ catalogPartitionSpec.getPartitionSpec().isEmpty()
+ ? "table"
+ : String.format("partition %s",
catalogPartitionSpec.getPartitionSpec()));
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+ if
(!Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ groupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ new
KeyComparatorSupplier(fileStore.partitionType()).get());
+ }
+ try {
+ newOptions.put(
+ COMPACTION_SCANNED_MANIFEST.key(),
+ Base64.getEncoder()
+ .encodeToString(
+ InstantiationUtil.serializeObject(
+ new PartitionedManifestMeta(
+ plan.snapshotId(),
groupBy))));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return newOptions;
+ }
+
+ @VisibleForTesting
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
Review Comment:
You have picked files here, but how to make sure that writer will compact
these files?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]