This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new dc4552491 [flink] support to infer parallelism for system table (#3201)
dc4552491 is described below
commit dc4552491a3f679f6e6591e816a465443cb1d211
Author: Aitozi <[email protected]>
AuthorDate: Fri Apr 12 13:30:24 2024 +0800
[flink] support to infer parallelism for system table (#3201)
---
.../paimon/flink/AbstractFlinkTableFactory.java | 5 +-
.../paimon/flink/source/DataTableSource.java | 79 +----------
.../paimon/flink/source/FlinkTableSource.java | 77 +++++++++++
.../paimon/flink/source/SystemTableSource.java | 32 ++++-
.../paimon/flink/source/DataTableSourceTest.java | 147 ++++++++++++++-------
5 files changed, 211 insertions(+), 129 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 782a9804e..c42780afa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -100,7 +100,10 @@ public abstract class AbstractFlinkTableFactory
== RuntimeExecutionMode.STREAMING;
if (origin instanceof SystemCatalogTable) {
return new PushedTableSource(
- new SystemTableSource(((SystemCatalogTable)
origin).table(), isStreamingMode));
+ new SystemTableSource(
+ ((SystemCatalogTable) origin).table(),
+ isStreamingMode,
+ context.getObjectIdentifier()));
} else {
Table table = buildPaimonTable(context);
if (table instanceof FileStoreTable) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index fd29dd048..ff0312ee5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.LogConsistency;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogSourceProvider;
@@ -32,13 +31,9 @@ import
org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
@@ -67,7 +62,6 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
-import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkState;
/**
@@ -79,9 +73,6 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
* LogSourceProvider}.
*/
public class DataTableSource extends FlinkTableSource {
- private static final String FLINK_INFER_SCAN_PARALLELISM =
- String.format(
- "%s%s", PAIMON_PREFIX,
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
private final ObjectIdentifier tableIdentifier;
private final boolean streaming;
@@ -90,8 +81,6 @@ public class DataTableSource extends FlinkTableSource {
@Nullable private WatermarkStrategy<RowData> watermarkStrategy;
- private SplitStatistics splitStatistics;
-
@Nullable private List<String> dynamicPartitionFilteringFields;
public DataTableSource(
@@ -211,48 +200,12 @@ public class DataTableSource extends FlinkTableSource {
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
return new PaimonDataStreamScanProvider(
- !streaming, env -> configureSource(sourceBuilder, env));
- }
-
- private DataStream<RowData> configureSource(
- FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
- Options options = Options.fromMap(this.table.options());
- Configuration envConfig = (Configuration) env.getConfiguration();
- if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
- options.set(
- FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
-
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
- }
- Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
- if (parallelism == null &&
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
- if (streaming) {
- parallelism = options.get(CoreOptions.BUCKET);
- } else {
- scanSplitsForInference();
- parallelism = splitStatistics.splitNumber();
- if (null != limit && limit > 0) {
- int limitCount =
- limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
limit.intValue();
- parallelism = Math.min(parallelism, limitCount);
- }
-
- parallelism = Math.max(1, parallelism);
- }
- parallelism =
- Math.min(
- parallelism,
-
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
- }
-
- return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
- }
-
- private void scanSplitsForInference() {
- if (splitStatistics == null) {
- List<Split> splits =
-
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
- splitStatistics = new SplitStatistics(splits);
- }
+ !streaming,
+ env ->
+ sourceBuilder
+ .withParallelism(inferSourceParallelism(env))
+ .withEnv(env)
+ .build());
}
@Override
@@ -335,24 +288,4 @@ public class DataTableSource extends FlinkTableSource {
public boolean isStreaming() {
return streaming;
}
-
- /** Split statistics for inferring row count and parallelism size. */
- protected static class SplitStatistics {
-
- private final int splitNumber;
- private final long totalRowCount;
-
- private SplitStatistics(List<Split> splits) {
- this.splitNumber = splits.size();
- this.totalRowCount =
splits.stream().mapToLong(Split::rowCount).sum();
- }
-
- public int splitNumber() {
- return splitNumber;
- }
-
- public long totalRowCount() {
- return totalRowCount;
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 68209240e..7254eefaa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -18,15 +18,21 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
@@ -46,16 +52,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+
/** A Flink {@link ScanTableSource} for paimon. */
public abstract class FlinkTableSource {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkTableSource.class);
+ protected static final String FLINK_INFER_SCAN_PARALLELISM =
+ String.format(
+ "%s%s", PAIMON_PREFIX,
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
+
protected final Table table;
@Nullable protected Predicate predicate;
@Nullable protected int[][] projectFields;
@Nullable protected Long limit;
+ protected SplitStatistics splitStatistics;
public FlinkTableSource(Table table) {
this(table, null, null, null);
@@ -132,4 +145,68 @@ public abstract class FlinkTableSource {
public abstract void applyDynamicFiltering(List<String>
candidateFilterFields);
public abstract boolean isStreaming();
+
+ @Nullable
+ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
+ Options options = Options.fromMap(this.table.options());
+ Configuration envConfig = (Configuration) env.getConfiguration();
+ if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
+ options.set(
+ FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
+
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
+ }
+ Integer parallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+ if (parallelism == null &&
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+ if (isStreaming()) {
+ parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
+ } else {
+ scanSplitsForInference();
+ parallelism = splitStatistics.splitNumber();
+ if (null != limit && limit > 0) {
+ int limitCount =
+ limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE :
limit.intValue();
+ parallelism = Math.min(parallelism, limitCount);
+ }
+
+ parallelism = Math.max(1, parallelism);
+ parallelism =
+ Math.min(
+ parallelism,
+
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
+ }
+ }
+ return parallelism;
+ }
+
+ protected void scanSplitsForInference() {
+ if (splitStatistics == null) {
+ List<Split> splits =
+
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+ splitStatistics = new SplitStatistics(splits);
+ }
+ }
+
+ /** Split statistics for inferring row count and parallelism size. */
+ protected static class SplitStatistics {
+
+ private final int splitNumber;
+ private final long totalRowCount;
+
+ protected SplitStatistics(List<Split> splits) {
+ this.splitNumber = splits.size();
+ this.totalRowCount =
splits.stream().mapToLong(Split::rowCount).sum();
+ }
+
+ public int splitNumber() {
+ return splitNumber;
+ }
+
+ public long totalRowCount() {
+ return totalRowCount;
+ }
+ }
+
+ public Table getTable() {
+ return table;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b577a73aa..49ed0c0b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.source;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
@@ -26,12 +27,14 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
-import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.plan.stats.TableStats;
@@ -46,13 +49,16 @@ public class SystemTableSource extends FlinkTableSource {
private final boolean isStreamingMode;
private final int splitBatchSize;
private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
+ private final ObjectIdentifier tableIdentifier;
- public SystemTableSource(Table table, boolean isStreamingMode) {
+ public SystemTableSource(
+ Table table, boolean isStreamingMode, ObjectIdentifier
tableIdentifier) {
super(table);
this.isStreamingMode = isStreamingMode;
Options options = Options.fromMap(table.options());
this.splitBatchSize =
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
this.splitAssignMode =
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
+ this.tableIdentifier = tableIdentifier;
}
public SystemTableSource(
@@ -62,11 +68,13 @@ public class SystemTableSource extends FlinkTableSource {
@Nullable int[][] projectFields,
@Nullable Long limit,
int splitBatchSize,
- FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
+ FlinkConnectorOptions.SplitAssignMode splitAssignMode,
+ ObjectIdentifier tableIdentifier) {
super(table, predicate, projectFields, limit);
this.isStreamingMode = isStreamingMode;
this.splitBatchSize = splitBatchSize;
this.splitAssignMode = splitAssignMode;
+ this.tableIdentifier = tableIdentifier;
}
@Override
@@ -85,7 +93,20 @@ public class SystemTableSource extends FlinkTableSource {
} else {
source = new StaticFileStoreSource(readBuilder, limit,
splitBatchSize, splitAssignMode);
}
- return SourceProvider.of(source);
+ return new PaimonDataStreamScanProvider(
+ source.getBoundedness() == Boundedness.BOUNDED,
+ env -> {
+ Integer parallelism = inferSourceParallelism(env);
+ DataStreamSource<RowData> dataStreamSource =
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ tableIdentifier.asSummaryString());
+ if (parallelism != null) {
+ dataStreamSource.setParallelism(parallelism);
+ }
+ return dataStreamSource;
+ });
}
@Override
@@ -97,7 +118,8 @@ public class SystemTableSource extends FlinkTableSource {
projectFields,
limit,
splitBatchSize,
- splitAssignMode);
+ splitAssignMode,
+ tableIdentifier);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
index 91af8071e..b98762ae2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
@@ -31,10 +31,15 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -47,6 +52,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
@@ -55,35 +61,12 @@ import static
org.assertj.core.api.AssertionsForClassTypes.assertThat;
/** Tests for {@link DataTableSource}. */
class DataTableSourceTest {
- @Test
- void testInferScanParallelism(@TempDir java.nio.file.Path path) throws
Exception {
- FileIO fileIO = LocalFileIO.create();
- Path tablePath = new Path(path.toString());
- SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
- TableSchema tableSchema =
- schemaManager.createTable(
- Schema.newBuilder()
- .column("a", DataTypes.INT())
- .column("b", DataTypes.BIGINT())
- .option("bucket", "1")
- .build());
- FileStoreTable fileStoreTable =
- FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
- InnerTableWrite writer = fileStoreTable.newWrite("test");
- TableCommitImpl commit = fileStoreTable.newCommit("test");
- writer.write(GenericRow.of(1, 2L));
- writer.write(GenericRow.of(3, 4L));
- writer.write(GenericRow.of(5, 6L));
- writer.write(GenericRow.of(7, 8L));
- writer.write(GenericRow.of(9, 10L));
- writer.write(GenericRow.of(11, 12L));
- writer.write(GenericRow.of(13, 14L));
- writer.write(GenericRow.of(15, 16L));
- writer.write(GenericRow.of(17, 18L));
- commit.commit(writer.prepareCommit());
+ @TempDir java.nio.file.Path path;
- commit.close();
- writer.close();
+ @Test
+ void testInferScanParallelism() throws Exception {
+ FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket",
"1"));
+ writeData(fileStoreTable);
DataTableSource tableSource =
new DataTableSource(
@@ -92,28 +75,7 @@ class DataTableSourceTest {
true,
null,
null);
- PaimonDataStreamScanProvider runtimeProvider =
- (PaimonDataStreamScanProvider)
- tableSource.getScanRuntimeProvider(
- new ScanTableSource.ScanContext() {
- @Override
- public <T> TypeInformation<T>
createTypeInformation(
- DataType dataType) {
- throw new
UnsupportedOperationException();
- }
-
- @Override
- public <T> TypeInformation<T>
createTypeInformation(
- LogicalType logicalType) {
- throw new
UnsupportedOperationException();
- }
-
- @Override
- public
DynamicTableSource.DataStructureConverter
-
createDataStructureConverter(DataType dataType) {
- throw new
UnsupportedOperationException();
- }
- });
+ PaimonDataStreamScanProvider runtimeProvider =
runtimeProvider(tableSource);
StreamExecutionEnvironment sEnv1 =
StreamExecutionEnvironment.createLocalEnvironment();
DataStream<RowData> sourceStream1 =
runtimeProvider.produceDataStream(s -> Optional.empty(),
sEnv1);
@@ -134,4 +96,89 @@ class DataTableSourceTest {
assertThat(sourceStream2.getParallelism()).isNotEqualTo(1);
assertThat(sourceStream2.getParallelism()).isEqualTo(sEnv2.getParallelism());
}
+
+ @Test
+ public void testInferStreamParallelism() throws Exception {
+ FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket",
"-1"));
+
+ DataTableSource tableSource =
+ new DataTableSource(
+ ObjectIdentifier.of("cat", "db", "table"),
+ fileStoreTable,
+ true,
+ null,
+ null);
+ PaimonDataStreamScanProvider runtimeProvider =
runtimeProvider(tableSource);
+
+ StreamExecutionEnvironment sEnv1 =
StreamExecutionEnvironment.createLocalEnvironment();
+ DataStream<RowData> sourceStream1 =
+ runtimeProvider.produceDataStream(s -> Optional.empty(),
sEnv1);
+ // parallelism = 1 for table with -1 bucket.
+ assertThat(sourceStream1.getParallelism()).isEqualTo(1);
+ }
+
+ @Test
+ public void testSystemTableParallelism() throws Exception {
+ FileStoreTable fileStoreTable =
+ createTable(ImmutableMap.of("bucket", "1", "scan.parallelism",
"3"));
+
+ ReadOptimizedTable ro = new ReadOptimizedTable(fileStoreTable);
+
+ SystemTableSource tableSource =
+ new SystemTableSource(ro, false, ObjectIdentifier.of("cat",
"db", "table"));
+ PaimonDataStreamScanProvider runtimeProvider =
runtimeProvider(tableSource);
+
+ Configuration configuration = new Configuration();
+ configuration.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
+ StreamExecutionEnvironment sEnv1 =
StreamExecutionEnvironment.createLocalEnvironment();
+ DataStream<RowData> sourceStream1 =
+ runtimeProvider.produceDataStream(s -> Optional.empty(),
sEnv1);
+ assertThat(sourceStream1.getParallelism()).isEqualTo(3);
+ }
+
+ private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource
tableSource) {
+ return (PaimonDataStreamScanProvider)
+ tableSource.getScanRuntimeProvider(
+ new ScanTableSource.ScanContext() {
+ @Override
+ public <T> TypeInformation<T>
createTypeInformation(DataType dataType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> TypeInformation<T>
createTypeInformation(
+ LogicalType logicalType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter
+ createDataStructureConverter(DataType
dataType) {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ private FileStoreTable createTable(Map<String, String> options) throws
Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(path.toString());
+ SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ TableSchema tableSchema =
+ schemaManager.createTable(
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.BIGINT())
+ .options(options)
+ .build());
+ return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+ }
+
+ private void writeData(FileStoreTable table) throws Exception {
+ InnerTableWrite writer = table.newWrite("test");
+ TableCommitImpl commit = table.newCommit("test");
+ writer.write(GenericRow.of(1, 2L));
+ commit.commit(writer.prepareCommit());
+ commit.close();
+ writer.close();
+ }
}