This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f8a80e7b9 [hive] Introduce Hive writer (#752)
f8a80e7b9 is described below
commit f8a80e7b9ea5dfd70f1e8a304114ea0c0b8aef3c
Author: HZY <[email protected]>
AuthorDate: Wed Apr 26 15:31:28 2023 +0800
[hive] Introduce Hive writer (#752)
---
docs/content/engines/hive.md | 31 +-
docs/content/engines/overview.md | 2 +-
.../org/apache/paimon/hive/HiveDeserializer.java | 308 ++++++
.../java/org/apache/paimon/hive/HiveSchema.java | 12 +
.../java/org/apache/paimon/hive/PaimonJobConf.java | 15 +
.../java/org/apache/paimon/hive/PaimonSerDe.java | 26 +-
.../apache/paimon/hive/PaimonStorageHandler.java | 18 +-
.../java/org/apache/paimon/hive/SchemaVisitor.java | 114 ++
.../paimon/hive/mapred/PaimonInputFormat.java | 35 +-
.../paimon/hive/mapred/PaimonOutputCommitter.java | 280 +++++
.../paimon/hive/mapred/PaimonOutputFormat.java | 55 +-
.../paimon/hive/mapred/PaimonRecordWriter.java | 98 ++
.../hive/objectinspector/HivePaimonArray.java | 196 ++++
.../objectinspector/PaimonCharObjectInspector.java | 7 +-
.../objectinspector/PaimonDateObjectInspector.java | 15 +-
.../PaimonDecimalObjectInspector.java | 17 +-
.../PaimonObjectInspectorFactory.java | 14 +
.../PaimonStringObjectInspector.java | 7 +-
.../PaimonTimestampObjectInspector.java | 16 +-
.../PaimonVarcharObjectInspector.java | 7 +-
.../objectinspector/WriteableObjectInspector.java | 25 +
.../org/apache/paimon/hive/utils/HiveUtils.java | 64 ++
.../org/apache/paimon/hive/FileStoreTestUtils.java | 19 +-
.../org/apache/paimon/hive/HiveWriteITCase.java | 1126 ++++++++++++++++++++
.../paimon/hive/PaimonStorageHandlerITCase.java | 40 +-
.../paimon/hive/RandomGenericRowDataGenerator.java | 7 +-
26 files changed, 2493 insertions(+), 61 deletions(-)
diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index 9691e3386..5016880a2 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -34,7 +34,7 @@ Paimon currently supports Hive 2.1, 2.1-cdh-6.3, 2.2, 2.3 and
3.1.
## Execution Engine
-Paimon currently supports MR and Tez execution engine for Hive.
+Paimon currently supports MR and Tez execution engine for Hive Read, and MR
execution engine for Hive Write.
## Installation
@@ -182,6 +182,35 @@ OK
1 Table
2 Store
*/
+
+-- Insert records into test table
+
+INSERT INTO test_table VALUES (3, 'Paimon');
+
+SELECT a, b FROM test_table ORDER BY a;
+
+/*
+OK
+1 Table
+2 Store
+3 Paimon
+*/
+-- Insert records into test table from other table
+
+INSERT INTO test_table SELECT a, b FROM test_table;
+
+SELECT a, b FROM test_table ORDER BY a;
+
+/*
+OK
+1 Table
+1 Table
+2 Store
+2 Store
+3 Paimon
+3 Paimon
+*/
+
```
## Hive Type Conversion
diff --git a/docs/content/engines/overview.md b/docs/content/engines/overview.md
index 172914082..16200161a 100644
--- a/docs/content/engines/overview.md
+++ b/docs/content/engines/overview.md
@@ -35,7 +35,7 @@ Apache Spark and Apache Hive.
| Engine | Version | Feature
| Read Pushdown |
|--------|-----------------------------|--------------------------------------------------------------------------------------|--------------------|
| Flink | 1.17/1.16/1.15/1.14 | batch/streaming read, batch/streaming
write, create/drop table, create/drop database | Projection, Filter |
-| Hive | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read
| Projection, Filter |
+| Hive | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read, batch write
| Projection, Filter |
| Spark | 3.4/3.3/3.2/3.1 | batch read, batch write, create/drop
table, create/drop database | Projection, Filter |
| Spark | 2.4 | batch read
| Projection, Filter |
| Trino | 388/358 | batch read, create/drop table,
create/drop database | Projection, Filter |
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
new file mode 100644
index 000000000..005169079
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.hive.objectinspector.HivePaimonArray;
+import org.apache.paimon.hive.objectinspector.WriteableObjectInspector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A deserializer to deserialize hive objects to {@link InternalRow}. */
+public class HiveDeserializer {
+ private final FieldDeserializer fieldDeserializer;
+
+ /**
+ * Builder to create a HiveDeserializer instance. Requires a Paimon
HiveSchema and the Hive
+ * ObjectInspector for converting the data.
+ */
+ static class Builder {
+ private HiveSchema schema;
+ private StructObjectInspector writerInspector;
+ private StructObjectInspector sourceInspector;
+
+ Builder schema(HiveSchema mainSchema) {
+ this.schema = mainSchema;
+ return this;
+ }
+
+ Builder writerInspector(StructObjectInspector inspector) {
+ this.writerInspector = inspector;
+ return this;
+ }
+
+ Builder sourceInspector(StructObjectInspector inspector) {
+ this.sourceInspector = inspector;
+ return this;
+ }
+
+ HiveDeserializer build() {
+ return new HiveDeserializer(
+ schema, new ObjectInspectorPair(writerInspector,
sourceInspector));
+ }
+ }
+
+ /**
+ * Deserializes the Hive result object to a Paimon record using the
provided ObjectInspectors.
+ *
+ * @param data The Hive data to deserialize
+ * @return The resulting Paimon Record
+ */
+ InternalRow deserialize(Object data) {
+ return (InternalRow) fieldDeserializer.value(data);
+ }
+
+ private HiveDeserializer(HiveSchema schema, ObjectInspectorPair pair) {
+ this.fieldDeserializer = DeserializerVisitor.visit(schema, pair);
+ }
+
+ private static class DeserializerVisitor
+ extends SchemaVisitor<ObjectInspectorPair, FieldDeserializer> {
+
+ public static FieldDeserializer visit(HiveSchema schema,
ObjectInspectorPair pair) {
+ return visit(
+ schema,
+ new SchemaNameMappingObjectInspectorPair(schema, pair),
+ new DeserializerVisitor(),
+ new PartnerObjectInspectorByNameAccessors());
+ }
+
+ @Override
+ public FieldDeserializer primitive(DataType type, ObjectInspectorPair
pair) {
+ return o -> {
+ if (o == null) {
+ return null;
+ }
+
+ ObjectInspector writerFieldInspector = pair.writerInspector();
+ ObjectInspector sourceFieldInspector = pair.sourceInspector();
+
+ Object result =
+ ((PrimitiveObjectInspector)
sourceFieldInspector).getPrimitiveJavaObject(o);
+ if (writerFieldInspector instanceof WriteableObjectInspector) {
+ result = ((WriteableObjectInspector)
writerFieldInspector).convert(result);
+ }
+
+ return result;
+ };
+ }
+
+ @Override
+ public FieldDeserializer rowType(
+ RowType type, ObjectInspectorPair pair,
List<FieldDeserializer> deserializers) {
+ Preconditions.checkNotNull(type, "Can not create deserializer for
null type");
+
+ return o -> {
+ if (o == null) {
+ return null;
+ }
+
+ List<Object> data =
+ ((StructObjectInspector) pair.sourceInspector())
+ .getStructFieldsDataAsList(o);
+
+ GenericRow row = new GenericRow(data.size());
+
+ for (int i = 0; i < data.size(); i++) {
+ Object fieldValue = data.get(i);
+ if (fieldValue != null) {
+ row.setField(i,
deserializers.get(i).value(fieldValue));
+ } else {
+ row.setField(i, null);
+ }
+ }
+
+ return row;
+ };
+ }
+
+ @Override
+ public FieldDeserializer list(
+ ArrayType type, ObjectInspectorPair pair, FieldDeserializer
deserializer) {
+ return o -> {
+ if (o == null) {
+ return null;
+ }
+
+ List<Object> result = Lists.newArrayList();
+ ListObjectInspector listInspector = (ListObjectInspector)
pair.sourceInspector();
+ for (Object val : listInspector.getList(o)) {
+ result.add(deserializer.value(val));
+ }
+ return new HivePaimonArray(type.getElementType(), result);
+ };
+ }
+
+ @Override
+ public FieldDeserializer map(
+ MapType mapType,
+ ObjectInspectorPair pair,
+ FieldDeserializer keyDeserializer,
+ FieldDeserializer valueDeserializer) {
+ return o -> {
+ if (o == null) {
+ return null;
+ }
+ List<Object> keys = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+ MapObjectInspector mapObjectInspector = (MapObjectInspector)
pair.sourceInspector();
+ Map<?, ?> map = mapObjectInspector.getMap(o);
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ keys.add(keyDeserializer.value(entry.getKey()));
+ values.add(valueDeserializer.value(entry.getValue()));
+ }
+ HivePaimonArray key = new
HivePaimonArray(mapType.getKeyType(), keys);
+ HivePaimonArray value = new
HivePaimonArray(mapType.getValueType(), values);
+ return new InternalMap() {
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public InternalArray keyArray() {
+ return key;
+ }
+
+ @Override
+ public InternalArray valueArray() {
+ return value;
+ }
+ };
+ };
+ }
+ }
+
+ private static class PartnerObjectInspectorByNameAccessors
+ implements SchemaVisitor.PartnerAccessors<ObjectInspectorPair> {
+
+ @Override
+ public ObjectInspectorPair fieldPartner(ObjectInspectorPair pair,
String name) {
+ String sourceName = pair.sourceName(name);
+ return new ObjectInspectorPair(
+ ((StructObjectInspector) pair.writerInspector())
+ .getStructFieldRef(name)
+ .getFieldObjectInspector(),
+ ((StructObjectInspector) pair.sourceInspector())
+ .getStructFieldRef(sourceName)
+ .getFieldObjectInspector());
+ }
+
+ @Override
+ public ObjectInspectorPair mapKeyPartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((MapObjectInspector)
pair.writerInspector()).getMapKeyObjectInspector(),
+ ((MapObjectInspector)
pair.sourceInspector()).getMapKeyObjectInspector());
+ }
+
+ @Override
+ public ObjectInspectorPair mapValuePartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((MapObjectInspector)
pair.writerInspector()).getMapValueObjectInspector(),
+ ((MapObjectInspector)
pair.sourceInspector()).getMapValueObjectInspector());
+ }
+
+ @Override
+ public ObjectInspectorPair listPartner(ObjectInspectorPair pair) {
+ return new ObjectInspectorPair(
+ ((ListObjectInspector)
pair.writerInspector()).getListElementObjectInspector(),
+ ((ListObjectInspector)
pair.sourceInspector()).getListElementObjectInspector());
+ }
+ }
+
+ private interface FieldDeserializer {
+ Object value(Object object);
+ }
+
+ /**
+ * The column name in the HiVE query result does not match the actual
column name. Therefore, we
+ * need to convert the column name
+ */
+ private static class SchemaNameMappingObjectInspectorPair extends
ObjectInspectorPair {
+ private final Map<String, String> sourceNameMap;
+
+ SchemaNameMappingObjectInspectorPair(HiveSchema schema,
ObjectInspectorPair pair) {
+ super(pair.writerInspector(), pair.sourceInspector());
+
+ this.sourceNameMap =
Maps.newHashMapWithExpectedSize(schema.fields().size());
+
+ List<? extends StructField> fields =
+ ((StructObjectInspector)
sourceInspector()).getAllStructFieldRefs();
+ for (int i = 0; i < schema.fields().size(); ++i) {
+ sourceNameMap.put(schema.fields().get(i).name(),
fields.get(i).getFieldName());
+ }
+ }
+
+ @Override
+ String sourceName(String originalName) {
+ return sourceNameMap.get(originalName);
+ }
+ }
+
+ /**
+ * To get the data for Paimon {@link GenericRow}s we have to use both
ObjectInspectors.
+ *
+ * <p>We use the Hive ObjectInspectors (sourceInspector) to get the Hive
primitive types.
+ *
+ * <p>We use the Paimon ObjectInspectors (writerInspector) to generate the
correct type for
+ * Paimon Records
+ */
+ private static class ObjectInspectorPair {
+ private ObjectInspector writerInspector;
+ private ObjectInspector sourceInspector;
+
+ ObjectInspectorPair(ObjectInspector writerInspector, ObjectInspector
sourceInspector) {
+ this.writerInspector = writerInspector;
+ this.sourceInspector = sourceInspector;
+ }
+
+ ObjectInspector writerInspector() {
+ return writerInspector;
+ }
+
+ ObjectInspector sourceInspector() {
+ return sourceInspector;
+ }
+
+ String sourceName(String originalName) {
+ return originalName;
+ }
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 72501baf7..5be40daf6 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -48,8 +49,15 @@ public class HiveSchema {
private final TableSchema tableSchema;
+ private final RowType rowType;
+
private HiveSchema(TableSchema tableSchema) {
this.tableSchema = tableSchema;
+ this.rowType = new RowType(tableSchema.fields());
+ }
+
+ public RowType rowType() {
+ return rowType;
}
public List<String> fieldNames() {
@@ -60,6 +68,10 @@ public class HiveSchema {
return tableSchema.logicalRowType().getFieldTypes();
}
+ public List<DataField> fields() {
+ return tableSchema.fields();
+ }
+
public List<String> fieldComments() {
return tableSchema.fields().stream()
.map(DataField::description)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
index 62cecde57..971bba50f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
@@ -18,6 +18,7 @@
package org.apache.paimon.hive;
+import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.JsonSerdeUtil;
@@ -38,6 +39,10 @@ public class PaimonJobConf {
private static final String INTERNAL_LOCATION = "paimon.internal.location";
private static final String INTERNAL_CATALOG_CONFIG =
"paimon.catalog.config";
+ public static final String MAPRED_OUTPUT_COMMITTER =
"mapred.output.committer.class";
+
+ public static final String PAIMON_WRITE = "paimon.write";
+
private static final String PAIMON_PREFIX = "paimon.";
private final JobConf jobConf;
@@ -56,6 +61,16 @@ public class PaimonJobConf {
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
}
+ public static void configureOutputJobProperties(
+ Configuration configuration, Properties properties, Map<String,
String> map) {
+ map.put(
+ INTERNAL_LOCATION,
+
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
+ map.put(MAPRED_OUTPUT_COMMITTER,
PaimonOutputCommitter.class.getName());
+ map.put(PAIMON_WRITE, Boolean.TRUE.toString());
+ properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
+ }
+
public String getLocation() {
return jobConf.get(INTERNAL_LOCATION);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
index 3bf5bd9ae..c67231924 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
@@ -20,15 +20,19 @@ package org.apache.paimon.hive;
import org.apache.paimon.hive.objectinspector.PaimonInternalRowObjectInspector;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Writable;
import javax.annotation.Nullable;
+import java.util.Map;
import java.util.Properties;
/**
@@ -39,11 +43,18 @@ import java.util.Properties;
public class PaimonSerDe extends AbstractSerDe {
private PaimonInternalRowObjectInspector inspector;
+ private HiveSchema tableSchema;
+
+ private final RowDataContainer rowData = new RowDataContainer();
+
+ private final Map<ObjectInspector, HiveDeserializer> deserializers =
+ Maps.newHashMapWithExpectedSize(1);
@Override
public void initialize(@Nullable Configuration configuration, Properties
properties)
throws SerDeException {
HiveSchema schema = HiveSchema.extract(configuration, properties);
+ this.tableSchema = schema;
inspector =
new PaimonInternalRowObjectInspector(
schema.fieldNames(), schema.fieldTypes(),
schema.fieldComments());
@@ -56,8 +67,19 @@ public class PaimonSerDe extends AbstractSerDe {
@Override
public Writable serialize(Object o, ObjectInspector objectInspector)
throws SerDeException {
- throw new UnsupportedOperationException(
- "PaimonSerDe currently only supports deserialization.");
+ HiveDeserializer deserializer = deserializers.get(objectInspector);
+ if (deserializer == null) {
+ deserializer =
+ new HiveDeserializer.Builder()
+ .schema(tableSchema)
+ .sourceInspector((StructObjectInspector)
objectInspector)
+ .writerInspector(inspector)
+ .build();
+ deserializers.put(objectInspector, deserializer);
+ }
+
+ rowData.set(deserializer.deserialize(o));
+ return rowData;
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 1a3cf8bbe..2fe0e9e49 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive;
import org.apache.paimon.hive.mapred.PaimonInputFormat;
+import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,9 @@ import org.apache.hadoop.mapred.OutputFormat;
import java.util.Map;
import java.util.Properties;
+import static org.apache.paimon.hive.PaimonJobConf.MAPRED_OUTPUT_COMMITTER;
+import static org.apache.paimon.hive.PaimonJobConf.PAIMON_WRITE;
+
/** {@link HiveStorageHandler} for paimon. This is the entrance class of Hive
API. */
public class PaimonStorageHandler implements HiveStoragePredicateHandler,
HiveStorageHandler {
@@ -78,13 +82,23 @@ public class PaimonStorageHandler implements
HiveStoragePredicateHandler, HiveSt
public void configureInputJobCredentials(TableDesc tableDesc, Map<String,
String> map) {}
@Override
- public void configureOutputJobProperties(TableDesc tableDesc, Map<String,
String> map) {}
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String,
String> map) {
+ Properties properties = tableDesc.getProperties();
+ PaimonJobConf.configureOutputJobProperties(conf, properties, map);
+ }
@Override
public void configureTableJobProperties(TableDesc tableDesc, Map<String,
String> map) {}
@Override
- public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {}
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+ if (tableDesc != null
+ && tableDesc.getProperties() != null
+ && tableDesc.getProperties().get(PAIMON_WRITE) != null) {
+
+ jobConf.set(MAPRED_OUTPUT_COMMITTER,
PaimonOutputCommitter.class.getName());
+ }
+ }
@Override
public void setConf(Configuration configuration) {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
new file mode 100644
index 000000000..c2fba0ad1
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * SchemaVisitor to visitor the sourceInspector and writerInspector to get the
FieldDeserializer.
+ */
+public abstract class SchemaVisitor<P, R> {
+
+ /** PartnerAccessors. */
+ public interface PartnerAccessors<P> {
+
+ P fieldPartner(P partnerRow, String name);
+
+ P mapKeyPartner(P partnerMap);
+
+ P mapValuePartner(P partnerMap);
+
+ P listPartner(P partnerList);
+ }
+
+ public static <P, T> T visit(
+ HiveSchema schema,
+ P partner,
+ SchemaVisitor<P, T> visitor,
+ PartnerAccessors<P> accessors) {
+ return visit(schema.rowType(), partner, visitor, accessors);
+ }
+
+ public static <P, T> T visit(
+ DataType type, P partner, SchemaVisitor<P, T> visitor,
PartnerAccessors<P> accessors) {
+ switch (type.getTypeRoot()) {
+ case ROW:
+ List<DataField> fields = ((RowType) type).getFields();
+ List<T> results =
Lists.newArrayListWithExpectedSize(fields.size());
+ for (DataField field : fields) {
+ P fieldPartner =
+ partner != null ? accessors.fieldPartner(partner,
field.name()) : null;
+ T result;
+ result = visit(field.type(), fieldPartner, visitor,
accessors);
+
+ results.add(result);
+ }
+ return visitor.rowType((RowType) type, partner, results);
+
+ case ARRAY:
+ ArrayType list = (ArrayType) type;
+ T result;
+ DataType elementType = list.getElementType();
+ P partnerElement = partner != null ?
accessors.listPartner(partner) : null;
+ result = visit(elementType, partnerElement, visitor,
accessors);
+
+ return visitor.list(list, partner, result);
+
+ case MAP:
+ MapType map = (MapType) type;
+ T keyResult;
+ T valueResult;
+
+ P keyPartner = partner != null ?
accessors.mapKeyPartner(partner) : null;
+ keyResult = visit(map.getKeyType(), keyPartner, visitor,
accessors);
+
+ P valuePartner = partner != null ?
accessors.mapValuePartner(partner) : null;
+ valueResult = visit(map.getValueType(), valuePartner, visitor,
accessors);
+ return visitor.map(map, partner, keyResult, valueResult);
+
+ default:
+ return visitor.primitive(type, partner);
+ }
+ }
+
+ public R rowType(RowType rowType, P partner, List<R> fieldResults) {
+ return null;
+ }
+
+ public R list(ArrayType list, P partner, R elementResult) {
+ return null;
+ }
+
+ public R map(MapType map, P partner, R keyResult, R valueResult) {
+ return null;
+ }
+
+ public R primitive(DataType primitive, P partner) {
+ return null;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 411ea0d03..e25ba3c2c 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,22 +18,12 @@
package org.apache.paimon.hive.mapred;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.hive.PaimonJobConf;
import org.apache.paimon.hive.RowDataContainer;
-import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -43,7 +33,9 @@ import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Optional;
+
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
/**
* {@link InputFormat} for paimon. It divides all files into {@link
InputSplit}s (one split per
@@ -75,27 +67,6 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
Arrays.asList(getSelectedColumns(jobConf)));
}
- private FileStoreTable createFileStoreTable(JobConf jobConf) {
- PaimonJobConf wrapper = new PaimonJobConf(jobConf);
- Options options = PaimonJobConf.extractCatalogConfig(jobConf);
- options.set(CoreOptions.PATH, wrapper.getLocation());
- CatalogContext catalogContext = CatalogContext.create(options,
jobConf);
- return FileStoreTableFactory.create(catalogContext);
- }
-
- private Optional<Predicate> createPredicate(TableSchema tableSchema,
JobConf jobConf) {
- SearchArgument sarg = ConvertAstToSearchArg.createFromConf(jobConf);
- if (sarg == null) {
- return Optional.empty();
- }
- SearchArgumentToPredicateConverter converter =
- new SearchArgumentToPredicateConverter(
- sarg,
- tableSchema.fieldNames(),
- tableSchema.logicalRowType().getFieldTypes());
- return converter.convert();
- }
-
private String[] getSelectedColumns(JobConf jobConf) {
// when using tez engine or when same table is joined multiple times,
// it is possible that some selected columns are duplicated
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
new file mode 100644
index 000000000..221447e61
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+
+/** A Paimon table committer for adding data files to the Paimon table. */
+public class PaimonOutputCommitter extends OutputCommitter {
+
+ private static final String PRE_COMMIT = ".preCommit";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonOutputCommitter.class);
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {}
+
+ @Override
+ public void setupTask(TaskAttemptContext taskAttemptContext) throws
IOException {}
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext)
throws IOException {
+ // We need to commit if this is the last phase of a MapReduce process
+ return TaskType.REDUCE.equals(
+
taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType())
+ || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskAttemptContext) throws
IOException {
+
+ TaskAttemptID attemptID = taskAttemptContext.getTaskAttemptID();
+ JobConf jobConf = taskAttemptContext.getJobConf();
+ FileStoreTable table = createFileStoreTable(jobConf);
+
+ Map<String, PaimonRecordWriter> writers =
+ Optional.ofNullable(PaimonRecordWriter.getWriters(attemptID))
+ .orElseGet(
+ () -> {
+ LOG.info(
+ "CommitTask found no writers for
output table: {}, attemptID: {}",
+ table.name(),
+ attemptID);
+ return ImmutableMap.of();
+ });
+ PaimonRecordWriter writer = writers.get(table.name());
+ if (writer != null) {
+
+ try (BatchTableWrite batchTableWrite = writer.batchTableWrite()) {
+ List<CommitMessage> commitTables =
batchTableWrite.prepareCommit();
+ createPreCommitFile(
+ commitTables,
+ generatePreCommitFileLocation(
+ table.location().getPath(),
+ attemptID.getJobID(),
+ attemptID.getTaskID().getId()),
+ table.fileIO());
+ } catch (Exception e) {
+ LOG.error(
+ "CommitTask prepareCommit error for specific table:
{}, attemptID: {}",
+ table.name(),
+ attemptID);
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.info(
+ "CommitTask found no writer for specific table: {},
attemptID: {}",
+ table.name(),
+ attemptID);
+ }
+ PaimonRecordWriter.removeWriters(attemptID);
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskAttemptContext) throws
IOException {
+ Map<String, PaimonRecordWriter> writers =
+
PaimonRecordWriter.removeWriters(taskAttemptContext.getTaskAttemptID());
+
+ // close writer and delete files
+ if (writers != null) {
+ for (PaimonRecordWriter writer : writers.values()) {
+ writer.close(true);
+ }
+ }
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ JobConf jobConf = jobContext.getJobConf();
+
+ long startTime = System.currentTimeMillis();
+ LOG.info("CommitJob {} has started", jobContext.getJobID());
+ FileStoreTable table = createFileStoreTable(jobConf);
+
+ if (table != null) {
+ BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+ List<CommitMessage> commitMessagesList =
+ getAllPreCommitMessage(table.location().getPath(),
jobContext, table.fileIO());
+ try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
+ batchTableCommit.commit(commitMessagesList);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ deleteTemporaryFile(
+ jobContext,
+ generateJobLocation(table.location().getPath(),
jobContext.getJobID()));
+ } else {
+ LOG.info("CommitJob not found table, Skipping job commit.");
+ }
+
+ LOG.info(
+ "Commit took {} ms for job {}",
+ System.currentTimeMillis() - startTime,
+ jobContext.getJobID());
+ }
+
+ @Override
+ public void abortJob(JobContext jobContext, int status) throws IOException
{
+ FileStoreTable table = createFileStoreTable(jobContext.getJobConf());
+ if (table != null) {
+
+ LOG.info("AbortJob {} has started", jobContext.getJobID());
+ List<CommitMessage> commitMessagesList =
+ getAllPreCommitMessage(table.location().getPath(),
jobContext, table.fileIO());
+ BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+ try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
+ batchTableCommit.abort(commitMessagesList);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ deleteTemporaryFile(
+ jobContext,
+ generateJobLocation(table.location().getPath(),
jobContext.getJobID()));
+ LOG.info("Job {} is aborted. preCommit file has deleted",
jobContext.getJobID());
+ }
+ }
+
+ /**
+ * Delete job's temporary locations.
+ *
+ * @param jobContext The job context
+ * @param location The locations to clean up
+ * @throws IOException if there is a failure deleting the files
+ */
+ private void deleteTemporaryFile(JobContext jobContext, String location)
throws IOException {
+ JobConf jobConf = jobContext.getJobConf();
+
+ LOG.info("Deleting temporary file for job {} started",
jobContext.getJobID());
+
+ LOG.info("The deleted file is located in : {}", location);
+ try {
+ org.apache.hadoop.fs.Path deleteFilePath = new
org.apache.hadoop.fs.Path(location);
+ FileSystem fs = deleteFilePath.getFileSystem(jobConf);
+ fs.delete(deleteFilePath, true);
+ } catch (IOException e) {
+ LOG.debug("Failed to delete directory {} ", location, e);
+ }
+ LOG.info("Deleting temporary file for job {} finished",
jobContext.getJobID());
+ }
+
+ /**
+ * Get the CommitMessages.
+ *
+ * @param location The location of the table
+ * @param jobContext The job context
+ * @param io The FileIO used for reading a files generated for commit
+ * @return The list of the committed data files
+ */
+ private static List<CommitMessage> getAllPreCommitMessage(
+ String location, JobContext jobContext, FileIO io) {
+ JobConf conf = jobContext.getJobConf();
+
+ int totalCommitMessagesSize =
+ conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() :
conf.getNumMapTasks();
+
+ List<CommitMessage> commitMessagesList =
Collections.synchronizedList(new ArrayList<>());
+
+ for (int i = 0; i < totalCommitMessagesSize; i++) {
+ String commitFileLocation =
+ generatePreCommitFileLocation(location,
jobContext.getJobID(), i);
+ commitMessagesList.addAll(readPreCommitFile(commitFileLocation,
io));
+ }
+
+ return commitMessagesList;
+ }
+
+ /**
+ * Generates the job temp location based on the job configuration.
+ *
+ * @param location The location of the table
+ * @param jobId The JobID for the task
+ * @return The file to store the results
+ */
+ static String generateJobLocation(String location, JobID jobId) {
+ return location + "/temp/" + jobId;
+ }
+
+ /**
+ * Generates preCommit file location based on the configuration and a
specific task id. In order
+ * to ensure the atomicity of the commit, we need to keep preCommit
persistent, restore it in
+ * {@link PaimonOutputCommitter#commitJob(JobContext)} to complete the
final commit, and delete
+ * the temporary files at the end.
+ *
+ * @param location The location of the table
+ * @param jobId jobId
+ * @param taskId taskId
+ * @return The location of preCommit file path
+ */
+ private static String generatePreCommitFileLocation(String location, JobID
jobId, int taskId) {
+ return generateJobLocation(location, jobId) + "/task_" + taskId +
PRE_COMMIT;
+ }
+
+ /**
+ * * Create a temp preCommitFile to store {@link
BatchTableWrite#prepareCommit()}'s
+ * result @Param commitTables The commitMessages of the table preCommit
@Param location The temp
+ * file's location @Param io The FileIO of the table.
+ */
+ private static void createPreCommitFile(
+ List<CommitMessage> commitTables, String location, FileIO io)
throws IOException {
+ try (ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(io.newOutputStream(new Path(location),
true))) {
+ objectOutputStream.writeObject(commitTables);
+ }
+ }
+
+ private static List<CommitMessage> readPreCommitFile(String location,
FileIO io) {
+ try (ObjectInputStream objectInputStream =
+ new ObjectInputStream(io.newInputStream(new Path(location)))) {
+ return (List<CommitMessage>) objectInputStream.readObject();
+ } catch (ClassNotFoundException | IOException e) {
+ throw new RuntimeException(
+ String.format("Can not read or parse CommitMessage file:
%s", location));
+ }
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index 6259fcb71..d40cf3781 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -18,27 +18,70 @@
package org.apache.paimon.hive.mapred;
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
-/** {@link OutputFormat} for table split. Currently useless. */
-public class PaimonOutputFormat implements OutputFormat<InternalRow,
InternalRow> {
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+
+/** {@link OutputFormat} for table split. */
+public class PaimonOutputFormat
+ implements OutputFormat<NullWritable, RowDataContainer>,
+ HiveOutputFormat<NullWritable, RowDataContainer> {
+
+ private static final String TASK_ATTEMPT_ID_KEY =
"mapreduce.task.attempt.id";
@Override
- public RecordWriter<InternalRow, InternalRow> getRecordWriter(
+ public RecordWriter<NullWritable, RowDataContainer> getRecordWriter(
FileSystem fileSystem, JobConf jobConf, String s, Progressable
progressable)
throws IOException {
- throw new UnsupportedOperationException(
- "Paimon currently can only be used as an input format for
Hive.");
+ return writer(jobConf);
}
@Override
public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf)
throws IOException {}
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jobConf,
+ Path path,
+ Class<? extends Writable> aClass,
+ boolean b,
+ Properties properties,
+ Progressable progressable)
+ throws IOException {
+ return writer(jobConf);
+ }
+
+ private static PaimonRecordWriter writer(JobConf jobConf) {
+ TaskAttemptID taskAttemptID =
TaskAttemptID.forName(jobConf.get(TASK_ATTEMPT_ID_KEY));
+ FileStoreTable table = createFileStoreTable(jobConf);
+ // force write-only = true
+ Map<String, String> newOptions =
+ Collections.singletonMap(CoreOptions.WRITE_ONLY.key(),
Boolean.TRUE.toString());
+ FileStoreTable copy = table.copy(newOptions);
+ BatchWriteBuilder batchWriteBuilder = copy.newBatchWriteBuilder();
+ BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+
+ return new PaimonRecordWriter(batchTableWrite, taskAttemptID,
copy.name());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
new file mode 100644
index 000000000..15a5f78cb
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.table.sink.BatchTableWrite;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+class PaimonRecordWriter
+ implements FileSinkOperator.RecordWriter,
+ org.apache.hadoop.mapred.RecordWriter<NullWritable,
RowDataContainer> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonRecordWriter.class);
+
+ // Each task generates a PaimonRecordWriter use Map<TaskAttemptID,
Map<String,
+ // PaimonRecordWriter>>
+ // is used to support multiple table writing for one task in the future
+ private static final Map<TaskAttemptID, Map<String, PaimonRecordWriter>>
writers =
+ Maps.newConcurrentMap();
+
+ private BatchTableWrite batchTableWrite;
+
+ public PaimonRecordWriter(
+ BatchTableWrite batchTableWrite, TaskAttemptID taskAttemptID,
String tableName) {
+ this.batchTableWrite = batchTableWrite;
+ writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+ writers.get(taskAttemptID).put(tableName, this);
+ }
+
+ static Map<String, PaimonRecordWriter> removeWriters(TaskAttemptID
taskAttemptID) {
+ return writers.remove(taskAttemptID);
+ }
+
+ static Map<String, PaimonRecordWriter> getWriters(TaskAttemptID
taskAttemptID) {
+ return writers.get(taskAttemptID);
+ }
+
+ public void write(Writable row) throws IOException {
+ try {
+ batchTableWrite.write(((RowDataContainer) row).get());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void write(NullWritable key, RowDataContainer value) throws
IOException {
+ write(value);
+ }
+
+ @Override
+ public void close(boolean abort) {
+ if (abort) {
+ try {
+ batchTableWrite.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close(Reporter reporter) {
+ close(false);
+ }
+
+ public BatchTableWrite batchTableWrite() {
+ return batchTableWrite;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
new file mode 100644
index 000000000..6d453b3fa
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.objectinspector;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+
+import java.util.List;
+
+/** HivePaimonArray for Array type. */
+public class HivePaimonArray implements InternalArray {
+
+ private final DataType elementType;
+ private final List<Object> list;
+
+ public HivePaimonArray(DataType elementType, List<Object> list) {
+ this.list = list;
+ this.elementType = elementType;
+ }
+
+ @Override
+ public int size() {
+ return list.size();
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return list.get(i) == null;
+ }
+
+ public List<Object> getList() {
+ return list;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T getAs(int i) {
+ return (T) list.get(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public BinaryString getString(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public Decimal getDecimal(int i, int precision, int scale) {
+ return getAs(i);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int i, int precision) {
+ return getAs(i);
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public InternalArray getArray(int i) {
+ return new HivePaimonArray(
+ ((ArrayType) elementType).getElementType(),
+ ((HivePaimonArray) this.getAs(i)).getList());
+ }
+
+ @Override
+ public InternalMap getMap(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public InternalRow getRow(int i, int i1) {
+ return getAs(i);
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] res = new boolean[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getBoolean(i);
+ }
+ return res;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] res = new byte[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getByte(i);
+ }
+ return res;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] res = new short[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getShort(i);
+ }
+ return res;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] res = new int[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getInt(i);
+ }
+ return res;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] res = new long[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getLong(i);
+ }
+ return res;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] res = new float[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getFloat(i);
+ }
+ return res;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] res = new double[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getDouble(i);
+ }
+ return res;
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
index 97ef6bbad..13c2009f9 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** {@link AbstractPrimitiveJavaObjectInspector} for CHAR type. */
public class PaimonCharObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements HiveCharObjectInspector {
+ implements HiveCharObjectInspector, WriteableObjectInspector {
private final int len;
@@ -59,4 +59,9 @@ public class PaimonCharObjectInspector extends
AbstractPrimitiveJavaObjectInspec
return o;
}
}
+
+ @Override
+ public BinaryString convert(Object value) {
+ return value == null ? null :
BinaryString.fromString(value.toString());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
index f11ef4990..a8eee30f7 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
@@ -26,10 +26,11 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import java.sql.Date;
+import java.time.LocalDate;
/** {@link AbstractPrimitiveJavaObjectInspector} for DATE type. */
public class PaimonDateObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements DateObjectInspector {
+ implements DateObjectInspector, WriteableObjectInspector {
public PaimonDateObjectInspector() {
super(TypeInfoFactory.dateTypeInfo);
@@ -57,4 +58,16 @@ public class PaimonDateObjectInspector extends
AbstractPrimitiveJavaObjectInspec
return o;
}
}
+
+ @Override
+ public Integer convert(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Date) {
+ return DateTimeUtils.toInternal((Date) value);
+ } else {
+ return DateTimeUtils.toInternal((LocalDate) value);
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
index 83e876c7b..e01c5cd86 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
@@ -26,9 +26,11 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import java.math.BigDecimal;
+
/** {@link AbstractPrimitiveJavaObjectInspector} for DECIMAL type. */
public class PaimonDecimalObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements HiveDecimalObjectInspector {
+ implements HiveDecimalObjectInspector, WriteableObjectInspector {
public PaimonDecimalObjectInspector(int precision, int scale) {
super(TypeInfoFactory.getDecimalTypeInfo(precision, scale));
@@ -56,4 +58,17 @@ public class PaimonDecimalObjectInspector extends
AbstractPrimitiveJavaObjectIns
return o;
}
}
+
+ @Override
+ public Decimal convert(Object o) {
+ if (o == null) {
+ return null;
+ }
+
+ BigDecimal result = ((HiveDecimal) o).bigDecimalValue();
+ // during the HiveDecimal to BigDecimal conversion the scale is lost,
when the value is 0
+ result = result.setScale(scale());
+
+ return Decimal.fromBigDecimal(result, result.precision(),
result.scale());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
index 8dd5bcb85..774291688 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
@@ -21,15 +21,20 @@ package org.apache.paimon.hive.objectinspector;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import java.util.List;
+import java.util.stream.Collectors;
+
/** Factory to create {@link ObjectInspector}s according to the given {@link
DataType}. */
public class PaimonObjectInspectorFactory {
@@ -70,6 +75,15 @@ public class PaimonObjectInspectorFactory {
case MAP:
MapType mapType = (MapType) logicalType;
return new PaimonMapObjectInspector(mapType.getKeyType(),
mapType.getValueType());
+ case ROW:
+ List<String> fieldComments =
+ ((RowType) logicalType)
+ .getFields().stream()
+ .map(DataField::description)
+ .collect(Collectors.toList());
+ List<DataType> fieldTypes = ((RowType)
logicalType).getFieldTypes();
+ List<String> fieldNames = ((RowType)
logicalType).getFieldNames();
+ return new PaimonInternalRowObjectInspector(fieldNames,
fieldTypes, fieldComments);
default:
throw new UnsupportedOperationException(
"Unsupported logical type " +
logicalType.asSQLString());
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
index 8917bcafd..9679aebd3 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.Text;
/** {@link AbstractPrimitiveJavaObjectInspector} for STRING type. */
public class PaimonStringObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements StringObjectInspector {
+ implements StringObjectInspector, WriteableObjectInspector {
public PaimonStringObjectInspector() {
super(TypeInfoFactory.stringTypeInfo);
@@ -52,4 +52,9 @@ public class PaimonStringObjectInspector extends
AbstractPrimitiveJavaObjectInsp
return o;
}
}
+
+ @Override
+ public BinaryString convert(Object value) {
+ return value == null ? null :
BinaryString.fromString(value.toString());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
index 0c47475e5..7b3ae41d7 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
@@ -25,9 +25,11 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import java.time.LocalDateTime;
+
/** {@link AbstractPrimitiveJavaObjectInspector} for TIMESTAMP type. */
public class PaimonTimestampObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements TimestampObjectInspector {
+ implements TimestampObjectInspector, WriteableObjectInspector {
public PaimonTimestampObjectInspector() {
super(TypeInfoFactory.timestampTypeInfo);
@@ -56,4 +58,16 @@ public class PaimonTimestampObjectInspector extends
AbstractPrimitiveJavaObjectI
return o;
}
}
+
+ @Override
+ public Timestamp convert(Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof java.sql.Timestamp) {
+ return Timestamp.fromSQLTimestamp((java.sql.Timestamp) value);
+ } else {
+ return Timestamp.fromLocalDateTime((LocalDateTime) value);
+ }
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
index 03ce70c8b..8a6a31c3e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/** {@link AbstractPrimitiveJavaObjectInspector} for VARCHAR type. */
public class PaimonVarcharObjectInspector extends
AbstractPrimitiveJavaObjectInspector
- implements HiveVarcharObjectInspector {
+ implements HiveVarcharObjectInspector, WriteableObjectInspector {
private final int len;
@@ -59,4 +59,9 @@ public class PaimonVarcharObjectInspector extends
AbstractPrimitiveJavaObjectIns
return o;
}
}
+
+ @Override
+ public BinaryString convert(Object value) {
+ return value == null ? null :
BinaryString.fromString(value.toString());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
new file mode 100644
index 000000000..2e05dbaed
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive.objectinspector;
+
+/** Interface for convert the hive primitive type to paimon dataType. */
+public interface WriteableObjectInspector {
+ Object convert(Object value);
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
new file mode 100644
index 000000000..c2630bfc1
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.hive.PaimonJobConf;
+import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Utils for create {@link FileStoreTable} and {@link Predicate}. */
+public class HiveUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
+
+ public static FileStoreTable createFileStoreTable(JobConf jobConf) {
+ PaimonJobConf wrapper = new PaimonJobConf(jobConf);
+ Options options = PaimonJobConf.extractCatalogConfig(jobConf);
+ options.set(CoreOptions.PATH, wrapper.getLocation());
+ CatalogContext catalogContext = CatalogContext.create(options,
jobConf);
+ return FileStoreTableFactory.create(catalogContext);
+ }
+
+ public static Optional<Predicate> createPredicate(TableSchema tableSchema,
JobConf jobConf) {
+ SearchArgument sarg = ConvertAstToSearchArg.createFromConf(jobConf);
+ if (sarg == null) {
+ return Optional.empty();
+ }
+ SearchArgumentToPredicateConverter converter =
+ new SearchArgumentToPredicateConverter(
+ sarg,
+ tableSchema.fieldNames(),
+ tableSchema.logicalRowType().getFieldTypes());
+ return converter.convert();
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
index 12dce9978..016aced0e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
@@ -33,15 +33,26 @@ import java.util.List;
/** Test utils related to {@link FileStore}. */
public class FileStoreTestUtils {
- private static final String TABLE_NAME = "hive_test_table";
+ public static final String TABLE_NAME = "hive_test_table";
- private static final String DATABASE_NAME = "default";
+ public static final String DATABASE_NAME = "default";
private static final Identifier TABLE_IDENTIFIER =
Identifier.create(DATABASE_NAME, TABLE_NAME);
public static Table createFileStoreTable(
Options conf, RowType rowType, List<String> partitionKeys,
List<String> primaryKeys)
throws Exception {
+ return createFileStoreTable(conf, rowType, partitionKeys, primaryKeys,
null);
+ }
+
+ public static Table createFileStoreTable(
+ Options conf,
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ Identifier identifier)
+ throws Exception {
+ Identifier identifierNotNull = identifier == null ? TABLE_IDENTIFIER :
identifier;
// create CatalogContext using the options
CatalogContext catalogContext = CatalogContext.create(conf);
Catalog catalog = CatalogFactory.createCatalog(catalogContext);
@@ -49,10 +60,10 @@ public class FileStoreTestUtils {
catalog.createDatabase(DATABASE_NAME, false);
// create table
catalog.createTable(
- TABLE_IDENTIFIER,
+ identifierNotNull,
new Schema(rowType.getFields(), partitionKeys, primaryKeys,
conf.toMap(), ""),
false);
- Table table = catalog.getTable(TABLE_IDENTIFIER);
+ Table table = catalog.getTable(identifierNotNull);
catalog.close();
return table;
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
new file mode 100644
index 000000000..69bc691dc
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -0,0 +1,1126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.hive.mapred.PaimonOutputFormat;
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.hive.FileStoreTestUtils.DATABASE_NAME;
+import static org.apache.paimon.hive.FileStoreTestUtils.TABLE_NAME;
+import static
org.apache.paimon.hive.RandomGenericRowDataGenerator.randomBigDecimal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link PaimonStorageHandler} and {@link PaimonOutputFormat}.
*/
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public class HiveWriteITCase {
+
+ @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
+ private static String engine;
+
+ private String commitUser;
+ private long commitIdentifier;
+
+ @BeforeClass
+ public static void beforeClass() {
+ // only run with mr
+ engine = "mr";
+ }
+
+ @Before
+ public void before() {
+ hiveShell.execute("SET hive.execution.engine=mr");
+
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+ hiveShell.execute("USE test_db");
+
+ commitUser = UUID.randomUUID().toString();
+ commitIdentifier = 0;
+ }
+
+ @After
+ public void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ }
+
+ private String createChangelogExternalTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<InternalRow> data)
+ throws Exception {
+
+ return createChangelogExternalTable(rowType, partitionKeys,
primaryKeys, data, "");
+ }
+
+ private String createChangelogExternalTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<InternalRow> data,
+ String tableName)
+ throws Exception {
+ String path = folder.newFolder().toURI().toString();
+ String tableNameNotNull =
+ StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
+ String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, path);
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf, rowType, partitionKeys, primaryKeys, identifier);
+
+ return writeData(table, tablePath, data);
+ }
+
+ private String createAppendOnlyExternalTable(
+ RowType rowType, List<String> partitionKeys, List<InternalRow>
data) throws Exception {
+ return createAppendOnlyExternalTable(rowType, partitionKeys, data, "");
+ }
+
+ private String createAppendOnlyExternalTable(
+ RowType rowType, List<String> partitionKeys, List<InternalRow>
data, String tableName)
+ throws Exception {
+ String path = folder.newFolder().toURI().toString();
+ String tableNameNotNull =
+ StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
+ String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, path);
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf, rowType, partitionKeys, Collections.emptyList(),
identifier);
+
+ return writeData(table, tablePath, data);
+ }
+
+ private String writeData(Table table, String path, List<InternalRow> data)
throws Exception {
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+ for (InternalRow rowData : data) {
+ write.write(rowData);
+ if (ThreadLocalRandom.current().nextInt(5) == 0) {
+ commit.commit(commitIdentifier, write.prepareCommit(false,
commitIdentifier));
+ commitIdentifier++;
+ }
+ }
+ commit.commit(commitIdentifier, write.prepareCommit(true,
commitIdentifier));
+ commitIdentifier++;
+ write.close();
+
+ String tableName = "test_table_" +
(UUID.randomUUID().toString().substring(0, 4));
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'")));
+ return tableName;
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ List<InternalRow> emptyData = Collections.emptyList();
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute(
+ "insert into " + outputTableName + " values
(1,2,3,'Hello'),(4,5,6,'Fine')");
+ List<String> select = hiveShell.executeQuery("select * from " +
outputTableName);
+ Assert.assertEquals(select, Arrays.asList("1\t2\t3\tHello",
"4\t5\t6\tFine"));
+ }
+
+ @Test
+ public void testWriteOnlyWithChangeLogTableOption() throws Exception {
+
+ String innerName = "hive_test_table_output";
+
+ String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/%s", path, innerName);
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, path);
+ conf.set(CoreOptions.BUCKET, 1);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ Identifier identifier = Identifier.create(DATABASE_NAME, innerName);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("a", "pt"),
+ identifier);
+ String tableName = "test_table_" +
(UUID.randomUUID().toString().substring(0, 4));
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + tablePath + "'")));
+ for (int i = 0; i < 5; i++) {
+ hiveShell.execute(
+ "insert into " + tableName + " values
(1,2,3,'Hello'),(4,5,6,'Fine')");
+ }
+ TableScan scan = table.newReadBuilder().newStreamScan();
+ DataSplit split = (DataSplit) scan.plan().splits().get(0);
+ // no compact snapshot
+ Assert.assertEquals(split.snapshotId(), 5L);
+ }
+
+ @Test
+ public void testWriteOnlyWithAppendOnlyTableOption() throws Exception {
+
+ String innerName = "hive_test_table_output";
+ int maxCompact = 3;
+ String path = folder.newFolder().toURI().toString();
+ String tablePath = String.format("%s/default.db/%s", path, innerName);
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, path);
+ conf.set(CoreOptions.BUCKET, 1);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ conf.set(CoreOptions.COMPACTION_MAX_FILE_NUM, maxCompact);
+ Identifier identifier = Identifier.create(DATABASE_NAME, innerName);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ identifier);
+ String tableName = "test_table_" +
(UUID.randomUUID().toString().substring(0, 4));
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + tablePath + "'")));
+ for (int i = 0; i < maxCompact; i++) {
+ hiveShell.execute(
+ "insert into " + tableName + " values
(1,2,3,'Hello'),(4,5,6,'Fine')");
+ }
+ TableScan scan = table.newReadBuilder().newStreamScan();
+ DataSplit split = (DataSplit) scan.plan().splits().get(0);
+ // no compact snapshot
+ Assert.assertEquals(split.snapshotId(), maxCompact);
+ }
+
+ @Test
+ public void testInsertFromSelectWithPartitionWithPk() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(1, 10, 100L,
BinaryString.fromString("Hi")),
+ GenericRow.of(2, 10, 200L,
BinaryString.fromString("Hello")),
+ GenericRow.of(1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi
Again")),
+ GenericRow.ofKind(
+ RowKind.DELETE, 1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(2, 20, 100L, null),
+ GenericRow.of(1, 30, 200L,
BinaryString.fromString("Store")));
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertFromSelectNoPartitionWithPk() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ 10L,
+ BinaryString.fromString("Hi"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)),
+ GenericRow.of(
+ 1,
+ 20L,
+ BinaryString.fromString("Hello"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)),
+ GenericRow.of(
+ 2,
+ 30L,
+ BinaryString.fromString("World"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)),
+ GenericRow.of(
+ 1,
+ 10L,
+ BinaryString.fromString("Hi Again"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)),
+ GenericRow.ofKind(
+ RowKind.DELETE,
+ 2,
+ 30L,
+ BinaryString.fromString("World"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)),
+ GenericRow.of(
+ 2, 40L, null,
Decimal.fromBigDecimal(randomBigDecimal(5, 3), 5, 3)),
+ GenericRow.of(
+ 3,
+ 50L,
+ BinaryString.fromString("Store"),
+ Decimal.fromBigDecimal(randomBigDecimal(5, 3),
5, 3)));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.DECIMAL(5, 3)
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ Arrays.asList("a", "b"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.DECIMAL(5, 3)
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertFromSelectWhereWithPartitionWithPk() throws
Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(1, 10, 100L,
BinaryString.fromString("Hi")),
+ GenericRow.of(2, 10, 200L,
BinaryString.fromString("Hello")),
+ GenericRow.of(1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi
Again")),
+ GenericRow.ofKind(
+ RowKind.DELETE, 1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(2, 20, 100L, null),
+ GenericRow.of(1, 30, 200L,
BinaryString.fromString("Store")));
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute(
+ "insert into " + outputTableName + " SELECT * FROM " +
tableName + " where a > 10");
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect =
+ hiveShell.executeStatement("select * from " + tableName + "
where a > 10");
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertFromSelectOrderWithPartitionWithPk() throws
Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(1, 10, 100L,
BinaryString.fromString("Hi")),
+ GenericRow.of(2, 10, 200L,
BinaryString.fromString("Hello")),
+ GenericRow.of(1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi
Again")),
+ GenericRow.ofKind(
+ RowKind.DELETE, 1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(2, 20, 100L, null),
+ GenericRow.of(1, 30, 200L,
BinaryString.fromString("Store")));
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ data);
+
+ String outputTableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "b"),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute(
+ "insert into "
+ + outputTableName
+ + " SELECT * FROM "
+ + tableName
+ + " order by b desc");
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect =
+ hiveShell.executeStatement("select * from " + tableName + "
order by b desc");
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertFromJoiningWithPartitionWithPk() throws Exception {
+
+ List<InternalRow> leftData =
+ Arrays.asList(
+ GenericRow.of(1, 10, 100L,
BinaryString.fromString("Hi")),
+ GenericRow.of(2, 10, 200L,
BinaryString.fromString("Hello")),
+ GenericRow.of(1, 20, 300L,
BinaryString.fromString("World")),
+ GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi
Again")));
+ List<InternalRow> rightData =
+ Arrays.asList(
+ GenericRow.of(1, 10, 1L,
BinaryString.fromString("HZY")),
+ GenericRow.of(2, 10, 2L,
BinaryString.fromString("LN")),
+ GenericRow.of(1, 20, 3L,
BinaryString.fromString("GOOD")),
+ GenericRow.of(1, 10, 4L, BinaryString.fromString("")));
+ List<InternalRow> emptyData = Collections.emptyList();
+ String leftTable =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ leftData);
+ String rightTable =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ rightData);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute(
+ "insert into "
+ + outputTableName
+ + " SELECT r.pt as pt,l.a as a,l.b as b ,r.c as c FROM
"
+ + leftTable
+ + " l left join "
+ + rightTable
+ + " r on l.a = r.a");
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect =
+ hiveShell.executeStatement(
+ " SELECT r.pt as pt,l.a as a,l.b as b ,r.c as c FROM "
+ + leftTable
+ + " l left join "
+ + rightTable
+ + " r on l.a = r.a");
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertAllSupportedTypes() throws Exception {
+
+ String root = folder.newFolder().toString();
+ String tablePath = String.format("%s/default.db/hive_test_table",
root);
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, root);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RandomGenericRowDataGenerator.ROW_TYPE,
+ Collections.emptyList(),
+ Collections.singletonList("f_int"));
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List<GenericRow> input = new ArrayList<>();
+ for (int i = random.nextInt(10); i > 0; i--) {
+ while (true) {
+ // pk must not be null
+ GenericRow rowData = RandomGenericRowDataGenerator.generate();
+ if (!rowData.isNullAt(3)) {
+ input.add(rowData);
+ break;
+ }
+ }
+ }
+
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+ for (GenericRow rowData : input) {
+ write.write(rowData);
+ }
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE test_table",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + tablePath + "'")));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String outputTableName =
+ createChangelogExternalTable(
+ RandomGenericRowDataGenerator.ROW_TYPE,
+ Collections.emptyList(),
+ Collections.singletonList("f_int"),
+ emptyData,
+ "hive_test_table_output");
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM
test_table");
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from
test_table");
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertArrayOfPrimitiveType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericArray(
+ Collections.singletonList(
+
BinaryString.fromString("xiaoyang"))
+ .toArray()),
+ new GenericArray(
+
Collections.singletonList(BinaryString.fromString("hi"))
+ .toArray()),
+ new GenericArray(
+ Collections.singletonList(
+ Decimal.fromBigDecimal(
+
randomBigDecimal(5, 3), 5, 3))
+ .toArray())),
+ GenericRow.of(
+ 1,
+ new GenericArray(
+
Collections.singletonList(BinaryString.fromString("hzy"))
+ .toArray()),
+ new GenericArray(
+
Collections.singletonList(BinaryString.fromString("hello"))
+ .toArray()),
+ new GenericArray(
+ Collections.singletonList(
+ Decimal.fromBigDecimal(
+
randomBigDecimal(5, 3), 5, 3))
+ .toArray())));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.CHAR(20)),
+ DataTypes.ARRAY(DataTypes.VARCHAR(100)),
+ DataTypes.ARRAY(DataTypes.DECIMAL(5, 3))
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(DataTypes.CHAR(20)),
+ DataTypes.ARRAY(DataTypes.VARCHAR(100)),
+ DataTypes.ARRAY(DataTypes.DECIMAL(5, 3))
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertArrayOfArrayType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericArray(
+ Collections.singletonList(
+ new GenericArray(
+
Collections.singletonList(
+
BinaryString
+
.fromString(
+
"xiaoyang"))
+
.toArray()))
+ .toArray()),
+ new GenericArray(
+ Collections.singletonList(
+ new GenericArray(
+
Collections.singletonList(
+
new GenericArray(
+
Collections
+
.singletonList(
+
BinaryString
+
.fromString(
+
"xiaoyang"))
+
.toArray()))
+
.toArray()))
+ .toArray())));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())),
+ DataTypes.ARRAY(
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+ },
+ new String[] {"a", "b", "c"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())),
+ DataTypes.ARRAY(
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+ },
+ new String[] {"a", "b", "c"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertArrayOfMapType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericArray(
+ Collections.singletonList(
+ new GenericMap(
+
Collections.singletonMap(
+
BinaryString.fromString(
+
"xiaoyang"),
+
BinaryString.fromString(
+
"xiaolan"))))
+ .toArray())));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertArrayOfRowType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericArray(
+ Collections.singletonList(
+ GenericRow.of(
+ GenericRow.of(
+
BinaryString.fromString(
+
"xiaoyang")),
+
BinaryString.fromString("xiaolan")))
+ .toArray())));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ new DataField(
+ 2,
+ "b1",
+ DataTypes.ROW(
+ new
DataField(
+ 4,
+
"b1_1",
+
DataTypes.STRING()))),
+ new DataField(3, "b2",
DataTypes.STRING())))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ new DataField(
+ 2,
+ "b1",
+ DataTypes.ROW(
+ new
DataField(
+ 4,
+
"b1_1",
+
DataTypes.STRING()))),
+ new DataField(3, "b2",
DataTypes.STRING())))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertMapOfPrimitiveType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ Collections.singletonMap(
+
BinaryString.fromString("xiaoyang"),
+
BinaryString.fromString("xiaolan")))));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING())
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING())
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertMapOfArrayType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ Collections.singletonMap(
+
BinaryString.fromString("xiaoyang"),
+ new GenericArray(
+
Collections.singletonList(
+
BinaryString.fromString(
+
"xiaoyang"))
+
.toArray())))));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertMapOfMapType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ Collections.singletonMap(
+
BinaryString.fromString("xiaolan"),
+ new GenericMap(
+
Collections.singletonMap(
+
BinaryString.fromString("xiaoyang"),
+
BinaryString.fromString(
+
"xiaolan")))))));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+
+ @Test
+ public void testInsertMapOfRowType() throws Exception {
+
+ List<InternalRow> data =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ new GenericMap(
+ Collections.singletonMap(
+
BinaryString.fromString("xiaolan"),
+ GenericRow.of(
+ GenericRow.of(
+
BinaryString.fromString(
+
"xiaoyang")),
+
BinaryString.fromString("xiaolan"))))));
+
+ List<InternalRow> emptyData = Collections.emptyList();
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.ROW(
+ new DataField(
+ 5,
+ "c1",
+ DataTypes.ROW(
+ new
DataField(
+ 6,
+
"c1_1",
+
DataTypes.STRING()))),
+ new DataField(7, "c2",
DataTypes.STRING())))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ Arrays.asList("a"),
+ data);
+
+ String outputTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.MAP(
+ DataTypes.STRING(),
+ DataTypes.ROW(
+ new DataField(
+ 5,
+ "c1",
+ DataTypes.ROW(
+ new
DataField(
+ 6,
+
"c1_1",
+
DataTypes.STRING()))),
+ new DataField(7, "c2",
DataTypes.STRING())))
+ },
+ new String[] {"a", "b"}),
+ Collections.emptyList(),
+ emptyData,
+ "hive_test_table_output");
+
+ hiveShell.execute("insert into " + outputTableName + " SELECT * FROM "
+ tableName);
+ List<Object[]> select = hiveShell.executeStatement("select * from " +
outputTableName);
+ List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
+
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 760388d31..83bb74d89 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -37,6 +38,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -64,6 +66,9 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.hive.FileStoreTestUtils.DATABASE_NAME;
+import static org.apache.paimon.hive.FileStoreTestUtils.TABLE_NAME;
+
/** IT cases for {@link PaimonStorageHandler} and {@link PaimonInputFormat}. */
@RunWith(PaimonEmbeddedHiveRunner.class)
public class PaimonStorageHandlerITCase {
@@ -82,7 +87,8 @@ public class PaimonStorageHandlerITCase {
public static void beforeClass() {
// TODO Currently FlinkEmbeddedHiveRunner can only be used for one
test class,
// so we have to select engine randomly. Write our own Hive tester in
the future.
- engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+ // engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+ engine = "mr";
}
@Before
@@ -602,30 +608,54 @@ public class PaimonStorageHandlerITCase {
List<String> primaryKeys,
List<InternalRow> data)
throws Exception {
+
+ return createChangelogExternalTable(rowType, partitionKeys,
primaryKeys, data, "");
+ }
+
+ private String createChangelogExternalTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<InternalRow> data,
+ String tableName)
+ throws Exception {
String path = folder.newFolder().toURI().toString();
- String tablePath = String.format("%s/default.db/hive_test_table",
path);
+ String tableNameNotNull =
+ StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
+ String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
Options conf = new Options();
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
Table table =
- FileStoreTestUtils.createFileStoreTable(conf, rowType,
partitionKeys, primaryKeys);
+ FileStoreTestUtils.createFileStoreTable(
+ conf, rowType, partitionKeys, primaryKeys, identifier);
return writeData(table, tablePath, data);
}
private String createAppendOnlyExternalTable(
RowType rowType, List<String> partitionKeys, List<InternalRow>
data) throws Exception {
+ return createAppendOnlyExternalTable(rowType, partitionKeys, data, "");
+ }
+
+ private String createAppendOnlyExternalTable(
+ RowType rowType, List<String> partitionKeys, List<InternalRow>
data, String tableName)
+ throws Exception {
String path = folder.newFolder().toURI().toString();
- String tablePath = String.format("%s/default.db/hive_test_table",
path);
+ String tableNameNotNull =
+ StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
+ String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
Options conf = new Options();
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
Table table =
FileStoreTestUtils.createFileStoreTable(
- conf, rowType, partitionKeys, Collections.emptyList());
+ conf, rowType, partitionKeys, Collections.emptyList(),
identifier);
return writeData(table, tablePath, data);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
index e9ab13bfe..74d4c92c0 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
@@ -133,6 +133,7 @@ public class RandomGenericRowDataGenerator {
FIELD_COMMENTS.get(i)))
.collect(Collectors.toList()));
+ // todo generate with schema
public static GenericRow generate() {
ThreadLocalRandom random = ThreadLocalRandom.current();
byte[] randomBytes = new byte[random.nextInt(20)];
@@ -185,11 +186,13 @@ public class RandomGenericRowDataGenerator {
return builder.toString();
}
- private static BigDecimal randomBigDecimal(int precision, int scale) {
+ public static BigDecimal randomBigDecimal(int precision, int scale) {
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder builder = new StringBuilder();
+ // to avoid starting with 0
for (int i = 0; i < precision - scale; i++) {
- builder.append((char) (random.nextInt(10) + '0'));
+ int t = random.nextInt(10);
+ builder.append(t == 0 ? 1 : t);
}
builder.append('.');
for (int i = 0; i < scale; i++) {