openinx commented on code in PR #119:
URL: https://github.com/apache/flink-table-store/pull/119#discussion_r876710336
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java:
##########
@@ -175,28 +181,33 @@ private MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}
- private FileStore buildFileStore() {
+ FileStore buildFileStore() {
RowType partitionType = TypeUtils.project(type, partitionKeysIndex());
FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
+ String json = options.get(COMPACTION_SCANNED_MANIFEST);
+ PartitionedManifestMeta manifestMeta =
+ json == null ? null : PartitionedManifestMeta.fromJson(json);
int[] trimmedPrimaryKeys = trimmedPrimaryKeysIndex();
if (trimmedPrimaryKeys.length == 0) {
return FileStoreImpl.createWithValueCount(
- fileStoreOptions.path(tableIdentifier).toString(),
- schema.id(),
- fileStoreOptions,
- user,
- partitionType,
- type);
+ fileStoreOptions.path(tableIdentifier).toString(),
+ schema.id(),
+ fileStoreOptions,
+ user,
+ partitionType,
+ type)
+ .withPartitionedMeta(manifestMeta);
Review Comment:
Nit: could we just pass the `manifestMeta` as a constructor argument ? All
arguments are constructor arguments except this `manifestMeta`, looks strange
for me.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java:
##########
@@ -175,28 +181,33 @@ private MergeEngine mergeEngine() {
return options.get(MERGE_ENGINE);
}
- private FileStore buildFileStore() {
+ FileStore buildFileStore() {
RowType partitionType = TypeUtils.project(type, partitionKeysIndex());
FileStoreOptions fileStoreOptions = new FileStoreOptions(options);
+ String json = options.get(COMPACTION_SCANNED_MANIFEST);
+ PartitionedManifestMeta manifestMeta =
+ json == null ? null : PartitionedManifestMeta.fromJson(json);
Review Comment:
Q: you mean people will need to set a string with json serialized
`PartitionedManifestMeta` when creating a managed flink native table ? If so,
I worry that the bar is too high for users..
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java:
##########
@@ -258,16 +269,17 @@ private long discoveryIntervalMills() {
private FileStoreSource buildFileSource(
boolean isContinuous, boolean continuousScanLatest) {
+ FileStore fileStore = buildFileStore();
return new FileStoreSource(
- buildFileStore(),
+ fileStore,
schema.primaryKeys().isEmpty(),
isContinuous,
discoveryIntervalMills(),
continuousScanLatest,
projectedFields,
partitionPredicate,
fieldPredicate,
- null);
+ ((FileStoreImpl) fileStore).getSpecifiedPartitionedMeta());
Review Comment:
It makes more sense to get this `partitionedMeta` by parsing the flink
options again (or adding an extra `withPartitionedMeta` in this `SourceBuilder`
with the parsed value ) , rather than getting it from a casted `FileStore`
implementation.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java:
##########
@@ -165,7 +187,36 @@ public void onDropTable(Context context, boolean
ignoreIfNotExists) {
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
- throw new UnsupportedOperationException("Not implement yet");
+ Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
+ FileStore fileStore = buildTableStore(context).buildFileStore();
+ Predicate partitionFilter =
+ PredicateConverter.CONVERTER.fromMap(
+ catalogPartitionSpec.getPartitionSpec(),
fileStore.partitionType());
+ FileStoreScan.Plan plan =
fileStore.newScan().withPartitionFilter(partitionFilter).plan();
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy =
plan.groupByPartFiles();
+
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filteredGroupBy;
+ if
(Boolean.parseBoolean(newOptions.get(COMPACTION_RESCALE_BUCKET.key()))) {
+ filteredGroupBy = groupBy;
+ } else {
+ filteredGroupBy =
+ pickManifest(
+ groupBy,
+ new
FileStoreOptions(Configuration.fromMap(newOptions))
+ .mergeTreeOptions(),
+ fileStore.keyType().getChildren());
+ }
+ PartitionedManifestMeta manifestMeta =
+ new PartitionedManifestMeta(plan.snapshotId(),
filteredGroupBy);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Data files to be compacted for managed table {} are {}
with root path {}",
+ context.getObjectIdentifier().asSerializableString(),
+ manifestMeta.asString(fileStore.partitionType()),
+ newOptions.get(PATH.key()));
+ }
+ newOptions.put(COMPACTION_SCANNED_MANIFEST.key(),
manifestMeta.toJson());
Review Comment:
Okay, looks like we are just encoding the object into this option by this
method, rather than by manual.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java:
##########
@@ -94,8 +94,9 @@ private RecordWriter getWriter(BinaryRowData partition, int
bucket) {
k ->
overwrite
? fileStoreWrite.createEmptyWriter(
- partition, bucket, compactExecutor)
- : fileStoreWrite.createWriter(partition,
bucket, compactExecutor));
+ partition.copy(), bucket,
compactExecutor)
Review Comment:
Is this an existing bug in the previous version, right ?
##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.file.utils.SnapshotFinder;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.flink.table.store.file.FileStoreOptions.relativeTablePath;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for 'ALTER TABLE ... COMPACT'. */
+public class AlterTableCompactITCase extends FileStoreTableITCase {
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T0 (f0 INT, f1 STRING, f2 DOUBLE)",
+ "CREATE TABLE IF NOT EXISTS T1 ("
+ + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1)",
+ "CREATE TABLE IF NOT EXISTS T2 ("
+ + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY (f1,
f0)");
+ }
+
+ @Test
+ public void testNonPartitioned() throws Exception {
+ bEnv.executeSql("INSERT INTO T0 VALUES (1, 'Pride and Prejudice',
9.0), (2, 'Emma', 8.5)")
Review Comment:
Nit: we can use `sql` to replace the `bEnv.executeSql` once we got this PR
merged:
https://github.com/apache/flink-table-store/pull/126/files#diff-9446f62b57e1655e58f126c1df332f38cced7102fed6c5f47dba409ccc924b14R99-R102
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java:
##########
@@ -309,4 +360,41 @@ TableStore buildTableStore(Context context) {
return store;
}
+
+ private Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> pickManifest(
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy,
+ MergeTreeOptions options,
+ List<LogicalType> keyTypes) {
+ Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> filteredGroupBy =
new HashMap<>();
+ Comparator<RowData> keyComparator =
+ CodeGenUtils.generateRecordComparator(new TableConfig(),
keyTypes, "KeyComparator")
+
.newInstance(Thread.currentThread().getContextClassLoader());
Review Comment:
Let's make this into a separate method in a utility class ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]