This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 7fdfc4539417ec7b97bcda97de92a52b457001c7 Author: JingsongLi <[email protected]> AuthorDate: Sat Mar 18 10:18:27 2023 +0800 [core] Rename paimon: TableStore to Paimon 2 --- .../java/org/apache/paimon/utils/JsonSerdeUtil.java | 4 ++-- ...rovider.java => PaimonDataStreamScanProvider.java} | 4 ++-- ...rovider.java => PaimonDataStreamSinkProvider.java} | 5 ++--- ...oreFactory.java => AbstractFlinkTableFactory.java} | 10 +++++----- .../java/org/apache/paimon/flink/FlinkCatalog.java | 2 +- ...reConnectorFactory.java => FlinkTableFactory.java} | 10 +++++----- ...rovider.java => PaimonDataStreamScanProvider.java} | 4 ++-- ...rovider.java => PaimonDataStreamSinkProvider.java} | 5 ++--- .../org/apache/paimon/flink/action/ActionBase.java | 2 +- .../apache/paimon/flink/action/MergeIntoAction.java | 3 +-- .../sink/{TableStoreSink.java => FlinkTableSink.java} | 14 +++++++------- .../{TableStoreSource.java => DataTableSource.java} | 16 ++++++++-------- .../apache/paimon/flink/source/SystemTableSource.java | 2 +- .../services/org.apache.flink.table.factories.Factory | 2 +- ...ryTest.java => AbstractFlinkTableFactoryTest.java} | 6 +++--- .../org/apache/paimon/flink/ChangelogModeTest.java | 10 +++++----- .../org/apache/paimon/flink/CreateTableITCase.java | 2 +- .../java/org/apache/paimon/flink/DropTableITCase.java | 2 +- .../{TableStoreTestBase.java => FlinkTestBase.java} | 2 +- .../org/apache/paimon/flink/ReadWriteTableITCase.java | 12 ++++++------ .../main/java/org/apache/paimon/hive/HiveCatalog.java | 6 +++--- .../java/org/apache/paimon/hive/PaimonMetaHook.java | 8 ++++---- .../main/java/org/apache/paimon/hive/PaimonSerDe.java | 2 +- .../org/apache/paimon/hive/PaimonStorageHandler.java | 8 ++++---- ...leStoreInputFormat.java => PaimonInputFormat.java} | 10 +++++----- ...ableStoreInputSplit.java => PaimonInputSplit.java} | 8 ++++---- ...StoreOutputFormat.java => PaimonOutputFormat.java} | 2 +- ...StoreRecordReader.java => PaimonRecordReader.java} | 6 +++--- .../objectinspector/PaimonRowDataObjectInspector.java | 14 +++++++------- .../paimon/hive/PaimonStorageHandlerITCase.java | 6 ++---- ...eInputSplitTest.java => PaimonInputSplitTest.java} | 10 +++++----- ...ordReaderTest.java => PaimonRecordReaderTest.java} | 19 +++++++++---------- 32 files changed, 105 insertions(+), 111 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 4a61dde66..dab9d1692 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -49,7 +49,7 @@ public class JsonSerdeUtil { static { OBJECT_MAPPER_INSTANCE = new ObjectMapper(); - OBJECT_MAPPER_INSTANCE.registerModule(createTableStoreJacksonModule()); + OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule()); } public static <T> T fromJson(String json, Class<T> clazz) { @@ -76,7 +76,7 @@ public class JsonSerdeUtil { } } - private static Module createTableStoreJacksonModule() { + private static Module createPaimonJacksonModule() { SimpleModule module = new SimpleModule("Paimon"); registerJsonObjects( module, TableSchema.class, SchemaSerializer.INSTANCE, SchemaSerializer.INSTANCE); diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java similarity index 93% rename from paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java rename to paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index a742f7f02..b7bea8e17 100644 --- a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -26,12 +26,12 @@ import org.apache.flink.table.data.RowData; import java.util.function.Function; /** Paimon {@link DataStreamScanProvider}. */ -public class TableStoreDataStreamScanProvider implements DataStreamScanProvider { +public class PaimonDataStreamScanProvider implements DataStreamScanProvider { private final boolean isBounded; private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer; - public TableStoreDataStreamScanProvider( + public PaimonDataStreamScanProvider( boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) { this.isBounded = isBounded; this.producer = producer; diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java similarity index 87% rename from paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java rename to paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index 6d72489be..34aa8991d 100644 --- a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -26,12 +26,11 @@ import org.apache.flink.table.data.RowData; import java.util.function.Function; /** Paimon {@link DataStreamSinkProvider}. */ -public class TableStoreDataStreamSinkProvider implements DataStreamSinkProvider { +public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider { private final Function<DataStream<RowData>, DataStreamSink<?>> producer; - public TableStoreDataStreamSinkProvider( - Function<DataStream<RowData>, DataStreamSink<?>> producer) { + public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) { this.producer = producer; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java similarity index 97% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 1624de1a3..763f77ff5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractTableStoreFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -35,9 +35,9 @@ import org.apache.paimon.CoreOptions.LogConsistency; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.flink.log.LogStoreTableFactory; -import org.apache.paimon.flink.sink.TableStoreSink; +import org.apache.paimon.flink.sink.FlinkTableSink; +import org.apache.paimon.flink.source.DataTableSource; import org.apache.paimon.flink.source.SystemTableSource; -import org.apache.paimon.flink.source.TableStoreSource; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.TableSchema; @@ -60,7 +60,7 @@ import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreTableFactory.discoverLogStoreFactory; /** Abstract paimon factory to create table source and table sink. */ -public abstract class AbstractTableStoreFactory +public abstract class AbstractFlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @Override @@ -72,7 +72,7 @@ public abstract class AbstractTableStoreFactory if (origin instanceof SystemCatalogTable) { return new SystemTableSource(((SystemCatalogTable) origin).table(), isStreamingMode); } else { - return new TableStoreSource( + return new DataTableSource( context.getObjectIdentifier(), buildFileStoreTable(context), isStreamingMode, @@ -83,7 +83,7 @@ public abstract class AbstractTableStoreFactory @Override public DynamicTableSink createDynamicTableSink(Context context) { - return new TableStoreSink( + return new FlinkTableSink( context.getObjectIdentifier(), buildFileStoreTable(context), context, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index f73317c55..1d42d3c24 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -88,7 +88,7 @@ public class FlinkCatalog extends AbstractCatalog { @Override public Optional<Factory> getFactory() { - return Optional.of(new TableStoreConnectorFactory(catalog.lockFactory().orElse(null))); + return Optional.of(new FlinkTableFactory(catalog.lockFactory().orElse(null))); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java similarity index 92% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 3d70000f0..d2387e0cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreConnectorFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -26,7 +26,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.CatalogLock; -import org.apache.paimon.flink.sink.TableStoreSink; +import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; @@ -38,15 +38,15 @@ import static org.apache.paimon.CoreOptions.AUTO_CREATE; import static org.apache.paimon.flink.FlinkCatalogFactory.IDENTIFIER; /** A paimon {@link DynamicTableFactory} to create source and sink. */ -public class TableStoreConnectorFactory extends AbstractTableStoreFactory { +public class FlinkTableFactory extends AbstractFlinkTableFactory { @Nullable private final CatalogLock.Factory lockFactory; - public TableStoreConnectorFactory() { + public FlinkTableFactory() { this(null); } - public TableStoreConnectorFactory(@Nullable CatalogLock.Factory lockFactory) { + public FlinkTableFactory(@Nullable CatalogLock.Factory lockFactory) { this.lockFactory = lockFactory; } @@ -84,7 +84,7 @@ public class TableStoreConnectorFactory extends AbstractTableStoreFactory { context.isTemporary()); } createTableIfNeeded(context); - TableStoreSink sink = (TableStoreSink) super.createDynamicTableSink(context); + FlinkTableSink sink = (FlinkTableSink) super.createDynamicTableSink(context); sink.setLockFactory(lockFactory); return sink; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java similarity index 93% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java index 7ab932352..af9df5b93 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamScanProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamScanProvider.java @@ -27,12 +27,12 @@ import org.apache.flink.table.data.RowData; import java.util.function.Function; /** Paimon {@link DataStreamScanProvider}. */ -public class TableStoreDataStreamScanProvider implements DataStreamScanProvider { +public class PaimonDataStreamScanProvider implements DataStreamScanProvider { private final boolean isBounded; private final Function<StreamExecutionEnvironment, DataStream<RowData>> producer; - public TableStoreDataStreamScanProvider( + public PaimonDataStreamScanProvider( boolean isBounded, Function<StreamExecutionEnvironment, DataStream<RowData>> producer) { this.isBounded = isBounded; this.producer = producer; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java similarity index 88% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java index fa4c6da7b..05eaacf5a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableStoreDataStreamSinkProvider.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PaimonDataStreamSinkProvider.java @@ -27,12 +27,11 @@ import org.apache.flink.table.data.RowData; import java.util.function.Function; /** Paimon {@link DataStreamSinkProvider}. */ -public class TableStoreDataStreamSinkProvider implements DataStreamSinkProvider { +public class PaimonDataStreamSinkProvider implements DataStreamSinkProvider { private final Function<DataStream<RowData>, DataStreamSink<?>> producer; - public TableStoreDataStreamSinkProvider( - Function<DataStream<RowData>, DataStreamSink<?>> producer) { + public PaimonDataStreamSinkProvider(Function<DataStream<RowData>, DataStreamSink<?>> producer) { this.producer = producer; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 7d5e935f9..8ea16956e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -105,7 +105,7 @@ public abstract class ActionBase implements Action { * Extract {@link LogicalType}s from Flink {@link org.apache.flink.table.types.DataType}s and * convert to Paimon {@link DataType}s. */ - protected List<DataType> toTableStoreDataTypes( + protected List<DataType> toPaimonTypes( List<org.apache.flink.table.types.DataType> flinkDataTypes) { return flinkDataTypes.stream() .map(org.apache.flink.table.types.DataType::getLogicalType) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 7870702b0..9fee82607 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -667,8 +667,7 @@ public class MergeIntoAction extends ActionBase { } private void checkSchema(String action, Table source) { - List<DataType> actualTypes = - toTableStoreDataTypes(source.getResolvedSchema().getColumnDataTypes()); + List<DataType> actualTypes = toPaimonTypes(source.getResolvedSchema().getColumnDataTypes()); List<DataType> expectedTypes = this.table.rowType().getFieldTypes(); if (!compatibleCheck(actualTypes, expectedTypes)) { throw new IllegalStateException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java similarity index 95% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 9dcbe349b..1b4838c41 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableStoreSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -33,7 +33,7 @@ import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.TableStoreDataStreamSinkProvider; +import org.apache.paimon.flink.PaimonDataStreamSinkProvider; import org.apache.paimon.flink.log.LogSinkProvider; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.operation.Lock; @@ -53,7 +53,7 @@ import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; /** Table sink to create sink. */ -public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning { +public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning { private final ObjectIdentifier tableIdentifier; private final FileStoreTable table; @@ -64,7 +64,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp private boolean overwrite = false; @Nullable private CatalogLock.Factory lockFactory; - public TableStoreSink( + public FlinkTableSink( ObjectIdentifier tableIdentifier, FileStoreTable table, DynamicTableFactory.Context context, @@ -128,7 +128,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp // Do not sink to log store when overwrite mode final LogSinkFunction logSinkFunction = overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink()); - return new TableStoreDataStreamSinkProvider( + return new PaimonDataStreamSinkProvider( (dataStream) -> new FlinkSinkBuilder(table) .withInput( @@ -148,8 +148,8 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp @Override public DynamicTableSink copy() { - TableStoreSink copied = - new TableStoreSink(tableIdentifier, table, context, logStoreTableFactory); + FlinkTableSink copied = + new FlinkTableSink(tableIdentifier, table, context, logStoreTableFactory); copied.staticPartitions = new HashMap<>(staticPartitions); copied.overwrite = overwrite; copied.lockFactory = lockFactory; @@ -158,7 +158,7 @@ public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, Supp @Override public String asSummaryString() { - return "TableStoreSink"; + return "PaimonSink"; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java similarity index 95% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index 1e7b0a6f9..390942e36 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/TableStoreSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -31,7 +31,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LogChangelogMode; import org.apache.paimon.CoreOptions.LogConsistency; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.TableStoreDataStreamScanProvider; +import org.apache.paimon.flink.PaimonDataStreamScanProvider; import org.apache.paimon.flink.log.LogSourceProvider; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.flink.lookup.FileStoreLookupFunction; @@ -60,7 +60,7 @@ import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE; * org.apache.flink.connector.base.source.hybrid.HybridSource} of {@link StaticFileStoreSource} and * kafka log source created by {@link LogSourceProvider}. */ -public class TableStoreSource extends FlinkTableSource +public class DataTableSource extends FlinkTableSource implements LookupTableSource, SupportsWatermarkPushDown { private final ObjectIdentifier tableIdentifier; @@ -71,7 +71,7 @@ public class TableStoreSource extends FlinkTableSource @Nullable private WatermarkStrategy<RowData> watermarkStrategy; - public TableStoreSource( + public DataTableSource( ObjectIdentifier tableIdentifier, FileStoreTable table, boolean streaming, @@ -89,7 +89,7 @@ public class TableStoreSource extends FlinkTableSource null); } - private TableStoreSource( + private DataTableSource( ObjectIdentifier tableIdentifier, FileStoreTable table, boolean streaming, @@ -135,7 +135,7 @@ public class TableStoreSource extends FlinkTableSource } // optimization: transaction consistency and all changelog mode avoid the generation of - // normalized nodes. See TableStoreSink.getChangelogMode validation. + // normalized nodes. See FlinkTableSink.getChangelogMode validation. return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL && options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL ? ChangelogMode.all() @@ -168,13 +168,13 @@ public class TableStoreSource extends FlinkTableSource .get(FlinkConnectorOptions.SCAN_PARALLELISM)) .withWatermarkStrategy(watermarkStrategy); - return new TableStoreDataStreamScanProvider( + return new PaimonDataStreamScanProvider( !streaming, env -> sourceBuilder.withEnv(env).build()); } @Override public DynamicTableSource copy() { - return new TableStoreSource( + return new DataTableSource( tableIdentifier, table, streaming, @@ -188,7 +188,7 @@ public class TableStoreSource extends FlinkTableSource @Override public String asSummaryString() { - return "TableStore-DataSource"; + return "Paimon-DataSource"; } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java index 94e2d968d..dda054fed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java @@ -86,6 +86,6 @@ public class SystemTableSource extends FlinkTableSource { @Override public String asSummaryString() { - return "TableStore-SystemTable-Source"; + return "Paimon-SystemTable-Source"; } } diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 0502da679..b4522f01a 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.paimon.flink.TableStoreConnectorFactory +org.apache.paimon.flink.FlinkTableFactory org.apache.paimon.flink.FlinkCatalogFactory org.apache.paimon.flink.kafka.KafkaLogStoreFactory diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java similarity index 93% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java index d88fd5b59..332c1bb9b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractTableStoreFactoryTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AbstractFlinkTableFactoryTest.java @@ -29,8 +29,8 @@ import java.util.Arrays; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link AbstractTableStoreFactory}. */ -public class AbstractTableStoreFactoryTest { +/** Test for {@link AbstractFlinkTableFactory}. */ +public class AbstractFlinkTableFactoryTest { @Test public void testSchemaEquals() { @@ -55,6 +55,6 @@ public class AbstractTableStoreFactoryTest { } private void innerTest(RowType r1, RowType r2, boolean expectEquals) { - assertThat(AbstractTableStoreFactory.schemaEquals(r1, r2)).isEqualTo(expectEquals); + assertThat(AbstractFlinkTableFactory.schemaEquals(r1, r2)).isEqualTo(expectEquals); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java index 3b8e255df..8f35a145f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogModeTest.java @@ -24,8 +24,8 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.kafka.KafkaLogStoreFactory; import org.apache.paimon.flink.log.LogStoreTableFactory; -import org.apache.paimon.flink.sink.TableStoreSink; -import org.apache.paimon.flink.source.TableStoreSource; +import org.apache.paimon.flink.sink.FlinkTableSink; +import org.apache.paimon.flink.source.DataTableSource; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -80,11 +80,11 @@ public class ChangelogModeTest { "")); FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); - TableStoreSource source = - new TableStoreSource(identifier, table, true, null, logStoreTableFactory); + DataTableSource source = + new DataTableSource(identifier, table, true, null, logStoreTableFactory); assertThat(source.getChangelogMode()).isEqualTo(expectSource); - TableStoreSink sink = new TableStoreSink(identifier, table, null, null); + FlinkTableSink sink = new FlinkTableSink(identifier, table, null, null); assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(expectSink); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java index 909905bb4..2050d20c3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CreateTableITCase.java @@ -37,7 +37,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; /** IT cases for testing create managed table ddl. */ -public class CreateTableITCase extends TableStoreTestBase { +public class CreateTableITCase extends FlinkTestBase { @Override public void prepareEnv( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java index 550d25a1b..44fbbe2d2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DropTableITCase.java @@ -36,7 +36,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; /** IT cases for testing drop managed table ddl. */ -public class DropTableITCase extends TableStoreTestBase { +public class DropTableITCase extends FlinkTestBase { @Override public void prepareEnv( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java similarity index 99% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java index 14426266e..0769dcfd9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/TableStoreTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkTestBase.java @@ -50,7 +50,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; /** End-to-end test base for paimon. */ -public abstract class TableStoreTestBase extends AbstractTestBase { +public abstract class FlinkTestBase extends AbstractTestBase { public static final String CURRENT_CATALOG = "catalog"; public static final String CURRENT_DATABASE = "default"; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 1d823b378..df70ec588 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -39,7 +39,7 @@ import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.Row; import org.apache.paimon.CoreOptions; -import org.apache.paimon.flink.sink.TableStoreSink; +import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -57,10 +57,10 @@ import java.util.Map; import java.util.UUID; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; -import static org.apache.paimon.flink.AbstractTableStoreFactory.buildFileStoreTable; +import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildFileStoreTable; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM; -import static org.apache.paimon.flink.TableStoreTestBase.createResolvedTable; +import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bExeEnv; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildQuery; @@ -1154,9 +1154,9 @@ public class ReadWriteTableITCase extends AbstractTestBase { .createTable(FlinkCatalog.fromCatalogTable(context.getCatalogTable())); DynamicTableSink tableSink = - new TableStoreSink( + new FlinkTableSink( context.getObjectIdentifier(), buildFileStoreTable(context), context, null); - assertThat(tableSink).isInstanceOf(TableStoreSink.class); + assertThat(tableSink).isInstanceOf(FlinkTableSink.class); // 2. get sink provider DynamicTableSink.SinkRuntimeProvider provider = @@ -1169,7 +1169,7 @@ public class ReadWriteTableITCase extends AbstractTestBase { bExeEnv.fromCollection(Collections.singletonList(GenericRowData.of())); DataStreamSink<?> sink = sinkProvider.consumeDataStream(null, mockSource); Transformation<?> transformation = sink.getTransformation(); - // until a PartitionTransformation, see TableStore.SinkBuilder.build() + // until a PartitionTransformation, see FlinkSinkBuilder.build() while (!(transformation instanceof PartitionTransformation)) { assertThat(transformation.getParallelism()).isIn(1, expectedParallelism); transformation = transformation.getInputs().get(0); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 9ff6c3d39..286d9908d 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -72,9 +72,9 @@ public class HiveCatalog extends AbstractCatalog { // we don't include paimon-hive-connector as dependencies because it depends on // hive-exec private static final String INPUT_FORMAT_CLASS_NAME = - "org.apache.paimon.hive.mapred.TableStoreInputFormat"; + "org.apache.paimon.hive.mapred.PaimonInputFormat"; private static final String OUTPUT_FORMAT_CLASS_NAME = - "org.apache.paimon.hive.mapred.TableStoreOutputFormat"; + "org.apache.paimon.hive.mapred.PaimonOutputFormat"; private static final String SERDE_CLASS_NAME = "org.apache.paimon.hive.PaimonSerDe"; private static final String STORAGE_HANDLER_CLASS_NAME = "org.apache.paimon.hive.PaimonStorageHandler"; @@ -162,7 +162,7 @@ public class HiveCatalog extends AbstractCatalog { .filter( tableName -> { Identifier identifier = new Identifier(databaseName, tableName); - // the environment here may not be able to access non-TableStore + // the environment here may not be able to access non-paimon // tables. // so we just check the schema file first return schemaFileExists(identifier) diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java index e27292382..1b8c8baa2 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java @@ -21,8 +21,8 @@ package org.apache.paimon.hive; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.paimon.hive.mapred.TableStoreInputFormat; -import org.apache.paimon.hive.mapred.TableStoreOutputFormat; +import org.apache.paimon.hive.mapred.PaimonInputFormat; +import org.apache.paimon.hive.mapred.PaimonOutputFormat; import org.apache.paimon.utils.Preconditions; /** @@ -39,8 +39,8 @@ public class PaimonMetaHook implements HiveMetaHook { + "with PARTITIONED BY clause. If you want to query from a partitioned table, " + "please add partition columns into the ordinary table columns."); - table.getSd().setInputFormat(TableStoreInputFormat.class.getCanonicalName()); - table.getSd().setOutputFormat(TableStoreOutputFormat.class.getCanonicalName()); + table.getSd().setInputFormat(PaimonInputFormat.class.getCanonicalName()); + table.getSd().setOutputFormat(PaimonOutputFormat.class.getCanonicalName()); } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java index 2e4297352..5e2d45745 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java @@ -56,7 +56,7 @@ public class PaimonSerDe extends AbstractSerDe { @Override public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException { throw new UnsupportedOperationException( - "TableStoreSerDe currently only supports deserialization."); + "PaimonSerDe currently only supports deserialization."); } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java index 37cf0cf4e..1ee77bc6e 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; -import org.apache.paimon.hive.mapred.TableStoreInputFormat; -import org.apache.paimon.hive.mapred.TableStoreOutputFormat; +import org.apache.paimon.hive.mapred.PaimonInputFormat; +import org.apache.paimon.hive.mapred.PaimonOutputFormat; import java.util.Map; import java.util.Properties; @@ -45,12 +45,12 @@ public class PaimonStorageHandler implements HiveStoragePredicateHandler, HiveSt @Override public Class<? extends InputFormat> getInputFormatClass() { - return TableStoreInputFormat.class; + return PaimonInputFormat.class; } @Override public Class<? extends OutputFormat> getOutputFormatClass() { - return TableStoreOutputFormat.class; + return PaimonOutputFormat.class; } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java similarity index 92% rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java index 1712edfd4..50d2a8452 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java @@ -47,7 +47,7 @@ import java.util.Optional; * {@link InputFormat} for paimon. It divides all files into {@link InputSplit}s (one split per * bucket) and creates {@link RecordReader} for each split. */ -public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer> { +public class PaimonInputFormat implements InputFormat<Void, RowDataContainer> { @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) { @@ -55,18 +55,18 @@ public class TableStoreInputFormat implements InputFormat<Void, RowDataContainer DataTableScan scan = table.newScan(); createPredicate(table.schema(), jobConf).ifPresent(scan::withFilter); return scan.plan().splits.stream() - .map(split -> new TableStoreInputSplit(table.location().toString(), split)) - .toArray(TableStoreInputSplit[]::new); + .map(split -> new PaimonInputSplit(table.location().toString(), split)) + .toArray(PaimonInputSplit[]::new); } @Override public RecordReader<Void, RowDataContainer> getRecordReader( InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { FileStoreTable table = createFileStoreTable(jobConf); - TableStoreInputSplit split = (TableStoreInputSplit) inputSplit; + PaimonInputSplit split = (PaimonInputSplit) inputSplit; ReadBuilder readBuilder = table.newReadBuilder(); createPredicate(table.schema(), jobConf).ifPresent(readBuilder::withFilter); - return new TableStoreRecordReader( + return new PaimonRecordReader( readBuilder, split, table.schema().fieldNames(), diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java similarity index 93% rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java index a7fe4e6ef..99871d1e6 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreInputSplit.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputSplit.java @@ -33,7 +33,7 @@ import java.util.Objects; /** * {@link FileSplit} for paimon. It contains all files to read from a certain partition and bucket. */ -public class TableStoreInputSplit extends FileSplit { +public class PaimonInputSplit extends FileSplit { private static final String[] ANYWHERE = new String[] {"*"}; @@ -41,9 +41,9 @@ public class TableStoreInputSplit extends FileSplit { private DataSplit split; // public no-argument constructor for deserialization - public TableStoreInputSplit() {} + public PaimonInputSplit() {} - public TableStoreInputSplit(String path, DataSplit split) { + public PaimonInputSplit(String path, DataSplit split) { this.path = path; this.split = split; } @@ -103,7 +103,7 @@ public class TableStoreInputSplit extends FileSplit { if (o == null || getClass() != o.getClass()) { return false; } - TableStoreInputSplit that = (TableStoreInputSplit) o; + PaimonInputSplit that = (PaimonInputSplit) o; return Objects.equals(path, that.path) && Objects.equals(split, that.split); } diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java similarity index 94% rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java index b1119993d..5e33bc5df 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreOutputFormat.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java @@ -28,7 +28,7 @@ import org.apache.paimon.data.InternalRow; import java.io.IOException; /** {@link OutputFormat} for table split. Currently useless. */ -public class TableStoreOutputFormat implements OutputFormat<InternalRow, InternalRow> { +public class PaimonOutputFormat implements OutputFormat<InternalRow, InternalRow> { @Override public RecordWriter<InternalRow, InternalRow> getRecordWriter( diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java similarity index 95% rename from paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java rename to paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java index 5485b620a..812f946d2 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/TableStoreRecordReader.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordReader.java @@ -39,7 +39,7 @@ import java.util.List; * columnNames} this reader will still produce records of the original schema. However, columns not * in {@code selectedColumns} will be null. */ -public class TableStoreRecordReader implements RecordReader<Void, RowDataContainer> { +public class PaimonRecordReader implements RecordReader<Void, RowDataContainer> { private final RecordReaderIterator<InternalRow> iterator; private final long splitLength; @@ -48,9 +48,9 @@ public class TableStoreRecordReader implements RecordReader<Void, RowDataContain private float progress; - public TableStoreRecordReader( + public PaimonRecordReader( ReadBuilder readBuilder, - TableStoreInputSplit split, + PaimonInputSplit split, List<String> columnNames, List<String> selectedColumns) throws IOException { diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java index e9dc491ad..6d61daf80 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonRowDataObjectInspector.java @@ -35,8 +35,8 @@ import java.util.stream.Collectors; /** {@link StructObjectInspector} for {@link InternalRow}. */ public class PaimonRowDataObjectInspector extends StructObjectInspector { - private final List<TableStoreStructField> structFields; - private final Map<String, TableStoreStructField> structFieldMap; + private final List<PaimonStructField> structFields; + private final Map<String, PaimonStructField> structFieldMap; private final String typeName; public PaimonRowDataObjectInspector( @@ -48,8 +48,8 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector { for (int i = 0; i < fieldNames.size(); i++) { String name = fieldNames.get(i); DataType logicalType = fieldTypes.get(i); - TableStoreStructField structField = - new TableStoreStructField( + PaimonStructField structField = + new PaimonStructField( name, PaimonObjectInspectorFactory.create(logicalType), i, @@ -84,7 +84,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector { @Override public Object getStructFieldData(Object o, StructField structField) { InternalRow rowData = (InternalRow) o; - return ((TableStoreStructField) structField).fieldGetter.getFieldOrNull(rowData); + return ((PaimonStructField) structField).fieldGetter.getFieldOrNull(rowData); } @Override @@ -105,7 +105,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector { return Category.STRUCT; } - private static class TableStoreStructField implements StructField { + private static class PaimonStructField implements StructField { private final String name; private final ObjectInspector objectInspector; @@ -113,7 +113,7 @@ public class PaimonRowDataObjectInspector extends StructObjectInspector { private final InternalRow.FieldGetter fieldGetter; private final String comment; - private TableStoreStructField( + private PaimonStructField( String name, ObjectInspector objectInspector, int idx, diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java index 0278d0736..1430d5555 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java @@ -33,6 +33,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.hive.mapred.PaimonInputFormat; import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; @@ -61,10 +62,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -/** - * IT cases for {@link PaimonStorageHandler} and {@link - * org.apache.paimon.hive.mapred.TableStoreInputFormat}. - */ +/** IT cases for {@link PaimonStorageHandler} and {@link PaimonInputFormat}. */ @RunWith(FlinkEmbeddedHiveRunner.class) public class PaimonStorageHandlerITCase { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java similarity index 92% rename from paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java rename to paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java index e9a79b274..2c8574edb 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreInputSplitTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonInputSplitTest.java @@ -35,8 +35,8 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link TableStoreInputSplit}. */ -public class TableStoreInputSplitTest { +/** Tests for {@link PaimonInputSplit}. */ +public class PaimonInputSplitTest { @TempDir java.nio.file.Path tempDir; @@ -50,8 +50,8 @@ public class TableStoreInputSplitTest { } BinaryRow wantedPartition = generated.get(0).partition; - TableStoreInputSplit split = - new TableStoreInputSplit( + PaimonInputSplit split = + new PaimonInputSplit( tempDir.toString(), new DataSplit( ThreadLocalRandom.current().nextLong(100), @@ -70,7 +70,7 @@ public class TableStoreInputSplitTest { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); DataInputStream input = new DataInputStream(bais); - TableStoreInputSplit actual = new TableStoreInputSplit(); + PaimonInputSplit actual = new PaimonInputSplit(); actual.readFields(input); assertThat(actual).isEqualTo(split); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java similarity index 92% rename from paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java rename to paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java index 2053d670e..b23796647 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/TableStoreRecordReaderTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonRecordReaderTest.java @@ -49,8 +49,8 @@ import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link TableStoreRecordReader}. */ -public class TableStoreRecordReaderTest { +/** Tests for {@link PaimonRecordReader}. */ +public class PaimonRecordReaderTest { @TempDir java.nio.file.Path tempDir; private String commitUser; @@ -83,7 +83,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRow.ofKind(RowKind.DELETE, 2L, BinaryString.fromString("Hello"))); commit.commit(0, write.prepareCommit(true, 0)); - TableStoreRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); Set<String> actual = new HashSet<>(); while (reader.next(null, container)) { @@ -122,7 +122,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRow.of(1, BinaryString.fromString("Hi"))); commit.commit(0, write.prepareCommit(true, 0)); - TableStoreRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0); RowDataContainer container = reader.createValue(); Map<String, Integer> actual = new HashMap<>(); while (reader.next(null, container)) { @@ -160,8 +160,7 @@ public class TableStoreRecordReaderTest { write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi"))); commit.commit(0, write.prepareCommit(true, 0)); - TableStoreRecordReader reader = - read(table, BinaryRow.EMPTY_ROW, 0, Arrays.asList("c", "a")); + PaimonRecordReader reader = read(table, BinaryRow.EMPTY_ROW, 0, Arrays.asList("c", "a")); RowDataContainer container = reader.createValue(); Map<String, Integer> actual = new HashMap<>(); while (reader.next(null, container)) { @@ -176,19 +175,19 @@ public class TableStoreRecordReaderTest { assertThat(actual).isEqualTo(expected); } - private TableStoreRecordReader read(FileStoreTable table, BinaryRow partition, int bucket) + private PaimonRecordReader read(FileStoreTable table, BinaryRow partition, int bucket) throws Exception { return read(table, partition, bucket, table.schema().fieldNames()); } - private TableStoreRecordReader read( + private PaimonRecordReader read( FileStoreTable table, BinaryRow partition, int bucket, List<String> selectedColumns) throws Exception { for (DataSplit split : table.newScan().plan().splits) { if (split.partition().equals(partition) && split.bucket() == bucket) { - return new TableStoreRecordReader( + return new PaimonRecordReader( table.newReadBuilder(), - new TableStoreInputSplit(tempDir.toString(), split), + new PaimonInputSplit(tempDir.toString(), split), table.schema().fieldNames(), selectedColumns); }
