This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f8a80e7b9 [hive] Introduce Hive writer (#752)
f8a80e7b9 is described below

commit f8a80e7b9ea5dfd70f1e8a304114ea0c0b8aef3c
Author: HZY <[email protected]>
AuthorDate: Wed Apr 26 15:31:28 2023 +0800

    [hive] Introduce Hive writer (#752)
---
 docs/content/engines/hive.md                       |   31 +-
 docs/content/engines/overview.md                   |    2 +-
 .../org/apache/paimon/hive/HiveDeserializer.java   |  308 ++++++
 .../java/org/apache/paimon/hive/HiveSchema.java    |   12 +
 .../java/org/apache/paimon/hive/PaimonJobConf.java |   15 +
 .../java/org/apache/paimon/hive/PaimonSerDe.java   |   26 +-
 .../apache/paimon/hive/PaimonStorageHandler.java   |   18 +-
 .../java/org/apache/paimon/hive/SchemaVisitor.java |  114 ++
 .../paimon/hive/mapred/PaimonInputFormat.java      |   35 +-
 .../paimon/hive/mapred/PaimonOutputCommitter.java  |  280 +++++
 .../paimon/hive/mapred/PaimonOutputFormat.java     |   55 +-
 .../paimon/hive/mapred/PaimonRecordWriter.java     |   98 ++
 .../hive/objectinspector/HivePaimonArray.java      |  196 ++++
 .../objectinspector/PaimonCharObjectInspector.java |    7 +-
 .../objectinspector/PaimonDateObjectInspector.java |   15 +-
 .../PaimonDecimalObjectInspector.java              |   17 +-
 .../PaimonObjectInspectorFactory.java              |   14 +
 .../PaimonStringObjectInspector.java               |    7 +-
 .../PaimonTimestampObjectInspector.java            |   16 +-
 .../PaimonVarcharObjectInspector.java              |    7 +-
 .../objectinspector/WriteableObjectInspector.java  |   25 +
 .../org/apache/paimon/hive/utils/HiveUtils.java    |   64 ++
 .../org/apache/paimon/hive/FileStoreTestUtils.java |   19 +-
 .../org/apache/paimon/hive/HiveWriteITCase.java    | 1126 ++++++++++++++++++++
 .../paimon/hive/PaimonStorageHandlerITCase.java    |   40 +-
 .../paimon/hive/RandomGenericRowDataGenerator.java |    7 +-
 26 files changed, 2493 insertions(+), 61 deletions(-)

diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index 9691e3386..5016880a2 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -34,7 +34,7 @@ Paimon currently supports Hive 2.1, 2.1-cdh-6.3, 2.2, 2.3 and 
3.1.
 
 ## Execution Engine
 
-Paimon currently supports MR and Tez execution engine for Hive.
+Paimon currently supports MR and Tez execution engine for Hive Read, and MR 
execution engine for Hive Write. 
 
 ## Installation
 
@@ -182,6 +182,35 @@ OK
 1      Table
 2      Store
 */
+
+-- Insert records into test table
+
+INSERT INTO test_table VALUES (3, 'Paimon');
+
+SELECT a, b FROM test_table ORDER BY a;
+
+/*
+OK
+1      Table
+2      Store
+3      Paimon
+*/
+-- Insert records into test table from other table
+
+INSERT INTO test_table SELECT a, b FROM test_table;
+
+SELECT a, b FROM test_table ORDER BY a;
+
+/*
+OK
+1      Table
+1      Table
+2      Store
+2      Store
+3      Paimon
+3      Paimon
+*/
+
 ```
 
 ## Hive Type Conversion
diff --git a/docs/content/engines/overview.md b/docs/content/engines/overview.md
index 172914082..16200161a 100644
--- a/docs/content/engines/overview.md
+++ b/docs/content/engines/overview.md
@@ -35,7 +35,7 @@ Apache Spark and Apache Hive.
 | Engine | Version                     | Feature                               
                                               | Read Pushdown      |
 
|--------|-----------------------------|--------------------------------------------------------------------------------------|--------------------|
 | Flink  | 1.17/1.16/1.15/1.14         | batch/streaming read, batch/streaming 
write, create/drop table, create/drop database | Projection, Filter |
-| Hive   | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read                            
                                               | Projection, Filter |
+| Hive   | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read, batch write               
                                               | Projection, Filter |
 | Spark  | 3.4/3.3/3.2/3.1             | batch read, batch write, create/drop 
table, create/drop database                     | Projection, Filter |
 | Spark  | 2.4                         | batch read                            
                                               | Projection, Filter |
 | Trino  | 388/358                     | batch read, create/drop table, 
create/drop database                                  | Projection, Filter |
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
new file mode 100644
index 000000000..005169079
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveDeserializer.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.hive.objectinspector.HivePaimonArray;
+import org.apache.paimon.hive.objectinspector.WriteableObjectInspector;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A deserializer to deserialize hive objects to {@link InternalRow}. */
+public class HiveDeserializer {
+    private final FieldDeserializer fieldDeserializer;
+
+    /**
+     * Builder to create a HiveDeserializer instance. Requires a Paimon 
HiveSchema and the Hive
+     * ObjectInspector for converting the data.
+     */
+    static class Builder {
+        private HiveSchema schema;
+        private StructObjectInspector writerInspector;
+        private StructObjectInspector sourceInspector;
+
+        Builder schema(HiveSchema mainSchema) {
+            this.schema = mainSchema;
+            return this;
+        }
+
+        Builder writerInspector(StructObjectInspector inspector) {
+            this.writerInspector = inspector;
+            return this;
+        }
+
+        Builder sourceInspector(StructObjectInspector inspector) {
+            this.sourceInspector = inspector;
+            return this;
+        }
+
+        HiveDeserializer build() {
+            return new HiveDeserializer(
+                    schema, new ObjectInspectorPair(writerInspector, 
sourceInspector));
+        }
+    }
+
+    /**
+     * Deserializes the Hive result object to a Paimon record using the 
provided ObjectInspectors.
+     *
+     * @param data The Hive data to deserialize
+     * @return The resulting Paimon Record
+     */
+    InternalRow deserialize(Object data) {
+        return (InternalRow) fieldDeserializer.value(data);
+    }
+
+    private HiveDeserializer(HiveSchema schema, ObjectInspectorPair pair) {
+        this.fieldDeserializer = DeserializerVisitor.visit(schema, pair);
+    }
+
+    private static class DeserializerVisitor
+            extends SchemaVisitor<ObjectInspectorPair, FieldDeserializer> {
+
+        public static FieldDeserializer visit(HiveSchema schema, 
ObjectInspectorPair pair) {
+            return visit(
+                    schema,
+                    new SchemaNameMappingObjectInspectorPair(schema, pair),
+                    new DeserializerVisitor(),
+                    new PartnerObjectInspectorByNameAccessors());
+        }
+
+        @Override
+        public FieldDeserializer primitive(DataType type, ObjectInspectorPair 
pair) {
+            return o -> {
+                if (o == null) {
+                    return null;
+                }
+
+                ObjectInspector writerFieldInspector = pair.writerInspector();
+                ObjectInspector sourceFieldInspector = pair.sourceInspector();
+
+                Object result =
+                        ((PrimitiveObjectInspector) 
sourceFieldInspector).getPrimitiveJavaObject(o);
+                if (writerFieldInspector instanceof WriteableObjectInspector) {
+                    result = ((WriteableObjectInspector) 
writerFieldInspector).convert(result);
+                }
+
+                return result;
+            };
+        }
+
+        @Override
+        public FieldDeserializer rowType(
+                RowType type, ObjectInspectorPair pair, 
List<FieldDeserializer> deserializers) {
+            Preconditions.checkNotNull(type, "Can not create deserializer for 
null type");
+
+            return o -> {
+                if (o == null) {
+                    return null;
+                }
+
+                List<Object> data =
+                        ((StructObjectInspector) pair.sourceInspector())
+                                .getStructFieldsDataAsList(o);
+
+                GenericRow row = new GenericRow(data.size());
+
+                for (int i = 0; i < data.size(); i++) {
+                    Object fieldValue = data.get(i);
+                    if (fieldValue != null) {
+                        row.setField(i, 
deserializers.get(i).value(fieldValue));
+                    } else {
+                        row.setField(i, null);
+                    }
+                }
+
+                return row;
+            };
+        }
+
+        @Override
+        public FieldDeserializer list(
+                ArrayType type, ObjectInspectorPair pair, FieldDeserializer 
deserializer) {
+            return o -> {
+                if (o == null) {
+                    return null;
+                }
+
+                List<Object> result = Lists.newArrayList();
+                ListObjectInspector listInspector = (ListObjectInspector) 
pair.sourceInspector();
+                for (Object val : listInspector.getList(o)) {
+                    result.add(deserializer.value(val));
+                }
+                return new HivePaimonArray(type.getElementType(), result);
+            };
+        }
+
+        @Override
+        public FieldDeserializer map(
+                MapType mapType,
+                ObjectInspectorPair pair,
+                FieldDeserializer keyDeserializer,
+                FieldDeserializer valueDeserializer) {
+            return o -> {
+                if (o == null) {
+                    return null;
+                }
+                List<Object> keys = new ArrayList<>();
+                List<Object> values = new ArrayList<>();
+                MapObjectInspector mapObjectInspector = (MapObjectInspector) 
pair.sourceInspector();
+                Map<?, ?> map = mapObjectInspector.getMap(o);
+                for (Map.Entry<?, ?> entry : map.entrySet()) {
+                    keys.add(keyDeserializer.value(entry.getKey()));
+                    values.add(valueDeserializer.value(entry.getValue()));
+                }
+                HivePaimonArray key = new 
HivePaimonArray(mapType.getKeyType(), keys);
+                HivePaimonArray value = new 
HivePaimonArray(mapType.getValueType(), values);
+                return new InternalMap() {
+                    @Override
+                    public int size() {
+                        return map.size();
+                    }
+
+                    @Override
+                    public InternalArray keyArray() {
+                        return key;
+                    }
+
+                    @Override
+                    public InternalArray valueArray() {
+                        return value;
+                    }
+                };
+            };
+        }
+    }
+
+    private static class PartnerObjectInspectorByNameAccessors
+            implements SchemaVisitor.PartnerAccessors<ObjectInspectorPair> {
+
+        @Override
+        public ObjectInspectorPair fieldPartner(ObjectInspectorPair pair, 
String name) {
+            String sourceName = pair.sourceName(name);
+            return new ObjectInspectorPair(
+                    ((StructObjectInspector) pair.writerInspector())
+                            .getStructFieldRef(name)
+                            .getFieldObjectInspector(),
+                    ((StructObjectInspector) pair.sourceInspector())
+                            .getStructFieldRef(sourceName)
+                            .getFieldObjectInspector());
+        }
+
+        @Override
+        public ObjectInspectorPair mapKeyPartner(ObjectInspectorPair pair) {
+            return new ObjectInspectorPair(
+                    ((MapObjectInspector) 
pair.writerInspector()).getMapKeyObjectInspector(),
+                    ((MapObjectInspector) 
pair.sourceInspector()).getMapKeyObjectInspector());
+        }
+
+        @Override
+        public ObjectInspectorPair mapValuePartner(ObjectInspectorPair pair) {
+            return new ObjectInspectorPair(
+                    ((MapObjectInspector) 
pair.writerInspector()).getMapValueObjectInspector(),
+                    ((MapObjectInspector) 
pair.sourceInspector()).getMapValueObjectInspector());
+        }
+
+        @Override
+        public ObjectInspectorPair listPartner(ObjectInspectorPair pair) {
+            return new ObjectInspectorPair(
+                    ((ListObjectInspector) 
pair.writerInspector()).getListElementObjectInspector(),
+                    ((ListObjectInspector) 
pair.sourceInspector()).getListElementObjectInspector());
+        }
+    }
+
+    private interface FieldDeserializer {
+        Object value(Object object);
+    }
+
+    /**
+     * The column name in the HiVE query result does not match the actual 
column name. Therefore, we
+     * need to convert the column name
+     */
+    private static class SchemaNameMappingObjectInspectorPair extends 
ObjectInspectorPair {
+        private final Map<String, String> sourceNameMap;
+
+        SchemaNameMappingObjectInspectorPair(HiveSchema schema, 
ObjectInspectorPair pair) {
+            super(pair.writerInspector(), pair.sourceInspector());
+
+            this.sourceNameMap = 
Maps.newHashMapWithExpectedSize(schema.fields().size());
+
+            List<? extends StructField> fields =
+                    ((StructObjectInspector) 
sourceInspector()).getAllStructFieldRefs();
+            for (int i = 0; i < schema.fields().size(); ++i) {
+                sourceNameMap.put(schema.fields().get(i).name(), 
fields.get(i).getFieldName());
+            }
+        }
+
+        @Override
+        String sourceName(String originalName) {
+            return sourceNameMap.get(originalName);
+        }
+    }
+
+    /**
+     * To get the data for Paimon {@link GenericRow}s we have to use both 
ObjectInspectors.
+     *
+     * <p>We use the Hive ObjectInspectors (sourceInspector) to get the Hive 
primitive types.
+     *
+     * <p>We use the Paimon ObjectInspectors (writerInspector) to generate the 
correct type for
+     * Paimon Records
+     */
+    private static class ObjectInspectorPair {
+        private ObjectInspector writerInspector;
+        private ObjectInspector sourceInspector;
+
+        ObjectInspectorPair(ObjectInspector writerInspector, ObjectInspector 
sourceInspector) {
+            this.writerInspector = writerInspector;
+            this.sourceInspector = sourceInspector;
+        }
+
+        ObjectInspector writerInspector() {
+            return writerInspector;
+        }
+
+        ObjectInspector sourceInspector() {
+            return sourceInspector;
+        }
+
+        String sourceName(String originalName) {
+            return originalName;
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 72501baf7..5be40daf6 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -26,6 +26,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -48,8 +49,15 @@ public class HiveSchema {
 
     private final TableSchema tableSchema;
 
+    private final RowType rowType;
+
     private HiveSchema(TableSchema tableSchema) {
         this.tableSchema = tableSchema;
+        this.rowType = new RowType(tableSchema.fields());
+    }
+
+    public RowType rowType() {
+        return rowType;
     }
 
     public List<String> fieldNames() {
@@ -60,6 +68,10 @@ public class HiveSchema {
         return tableSchema.logicalRowType().getFieldTypes();
     }
 
+    public List<DataField> fields() {
+        return tableSchema.fields();
+    }
+
     public List<String> fieldComments() {
         return tableSchema.fields().stream()
                 .map(DataField::description)
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
index 62cecde57..971bba50f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonJobConf.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.hive;
 
+import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
@@ -38,6 +39,10 @@ public class PaimonJobConf {
     private static final String INTERNAL_LOCATION = "paimon.internal.location";
     private static final String INTERNAL_CATALOG_CONFIG = 
"paimon.catalog.config";
 
+    public static final String MAPRED_OUTPUT_COMMITTER = 
"mapred.output.committer.class";
+
+    public static final String PAIMON_WRITE = "paimon.write";
+
     private static final String PAIMON_PREFIX = "paimon.";
 
     private final JobConf jobConf;
@@ -56,6 +61,16 @@ public class PaimonJobConf {
                 
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
     }
 
+    public static void configureOutputJobProperties(
+            Configuration configuration, Properties properties, Map<String, 
String> map) {
+        map.put(
+                INTERNAL_LOCATION,
+                
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION));
+        map.put(MAPRED_OUTPUT_COMMITTER, 
PaimonOutputCommitter.class.getName());
+        map.put(PAIMON_WRITE, Boolean.TRUE.toString());
+        properties.put(PAIMON_WRITE, Boolean.TRUE.toString());
+    }
+
     public String getLocation() {
         return jobConf.get(INTERNAL_LOCATION);
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
index 3bf5bd9ae..c67231924 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonSerDe.java
@@ -20,15 +20,19 @@ package org.apache.paimon.hive;
 
 import org.apache.paimon.hive.objectinspector.PaimonInternalRowObjectInspector;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.io.Writable;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -39,11 +43,18 @@ import java.util.Properties;
 public class PaimonSerDe extends AbstractSerDe {
 
     private PaimonInternalRowObjectInspector inspector;
+    private HiveSchema tableSchema;
+
+    private final RowDataContainer rowData = new RowDataContainer();
+
+    private final Map<ObjectInspector, HiveDeserializer> deserializers =
+            Maps.newHashMapWithExpectedSize(1);
 
     @Override
     public void initialize(@Nullable Configuration configuration, Properties 
properties)
             throws SerDeException {
         HiveSchema schema = HiveSchema.extract(configuration, properties);
+        this.tableSchema = schema;
         inspector =
                 new PaimonInternalRowObjectInspector(
                         schema.fieldNames(), schema.fieldTypes(), 
schema.fieldComments());
@@ -56,8 +67,19 @@ public class PaimonSerDe extends AbstractSerDe {
 
     @Override
     public Writable serialize(Object o, ObjectInspector objectInspector) 
throws SerDeException {
-        throw new UnsupportedOperationException(
-                "PaimonSerDe currently only supports deserialization.");
+        HiveDeserializer deserializer = deserializers.get(objectInspector);
+        if (deserializer == null) {
+            deserializer =
+                    new HiveDeserializer.Builder()
+                            .schema(tableSchema)
+                            .sourceInspector((StructObjectInspector) 
objectInspector)
+                            .writerInspector(inspector)
+                            .build();
+            deserializers.put(objectInspector, deserializer);
+        }
+
+        rowData.set(deserializer.deserialize(o));
+        return rowData;
     }
 
     @Override
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 1a3cf8bbe..2fe0e9e49 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.hive.mapred.PaimonInputFormat;
+import org.apache.paimon.hive.mapred.PaimonOutputCommitter;
 import org.apache.paimon.hive.mapred.PaimonOutputFormat;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,9 @@ import org.apache.hadoop.mapred.OutputFormat;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.paimon.hive.PaimonJobConf.MAPRED_OUTPUT_COMMITTER;
+import static org.apache.paimon.hive.PaimonJobConf.PAIMON_WRITE;
+
 /** {@link HiveStorageHandler} for paimon. This is the entrance class of Hive 
API. */
 public class PaimonStorageHandler implements HiveStoragePredicateHandler, 
HiveStorageHandler {
 
@@ -78,13 +82,23 @@ public class PaimonStorageHandler implements 
HiveStoragePredicateHandler, HiveSt
     public void configureInputJobCredentials(TableDesc tableDesc, Map<String, 
String> map) {}
 
     @Override
-    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String> map) {}
+    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String> map) {
+        Properties properties = tableDesc.getProperties();
+        PaimonJobConf.configureOutputJobProperties(conf, properties, map);
+    }
 
     @Override
     public void configureTableJobProperties(TableDesc tableDesc, Map<String, 
String> map) {}
 
     @Override
-    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {}
+    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+        if (tableDesc != null
+                && tableDesc.getProperties() != null
+                && tableDesc.getProperties().get(PAIMON_WRITE) != null) {
+
+            jobConf.set(MAPRED_OUTPUT_COMMITTER, 
PaimonOutputCommitter.class.getName());
+        }
+    }
 
     @Override
     public void setConf(Configuration configuration) {
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
new file mode 100644
index 000000000..c2fba0ad1
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/SchemaVisitor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * SchemaVisitor to visitor the sourceInspector and writerInspector to get the 
FieldDeserializer.
+ */
+public abstract class SchemaVisitor<P, R> {
+
+    /** PartnerAccessors. */
+    public interface PartnerAccessors<P> {
+
+        P fieldPartner(P partnerRow, String name);
+
+        P mapKeyPartner(P partnerMap);
+
+        P mapValuePartner(P partnerMap);
+
+        P listPartner(P partnerList);
+    }
+
+    public static <P, T> T visit(
+            HiveSchema schema,
+            P partner,
+            SchemaVisitor<P, T> visitor,
+            PartnerAccessors<P> accessors) {
+        return visit(schema.rowType(), partner, visitor, accessors);
+    }
+
+    public static <P, T> T visit(
+            DataType type, P partner, SchemaVisitor<P, T> visitor, 
PartnerAccessors<P> accessors) {
+        switch (type.getTypeRoot()) {
+            case ROW:
+                List<DataField> fields = ((RowType) type).getFields();
+                List<T> results = 
Lists.newArrayListWithExpectedSize(fields.size());
+                for (DataField field : fields) {
+                    P fieldPartner =
+                            partner != null ? accessors.fieldPartner(partner, 
field.name()) : null;
+                    T result;
+                    result = visit(field.type(), fieldPartner, visitor, 
accessors);
+
+                    results.add(result);
+                }
+                return visitor.rowType((RowType) type, partner, results);
+
+            case ARRAY:
+                ArrayType list = (ArrayType) type;
+                T result;
+                DataType elementType = list.getElementType();
+                P partnerElement = partner != null ? 
accessors.listPartner(partner) : null;
+                result = visit(elementType, partnerElement, visitor, 
accessors);
+
+                return visitor.list(list, partner, result);
+
+            case MAP:
+                MapType map = (MapType) type;
+                T keyResult;
+                T valueResult;
+
+                P keyPartner = partner != null ? 
accessors.mapKeyPartner(partner) : null;
+                keyResult = visit(map.getKeyType(), keyPartner, visitor, 
accessors);
+
+                P valuePartner = partner != null ? 
accessors.mapValuePartner(partner) : null;
+                valueResult = visit(map.getValueType(), valuePartner, visitor, 
accessors);
+                return visitor.map(map, partner, keyResult, valueResult);
+
+            default:
+                return visitor.primitive(type, partner);
+        }
+    }
+
+    public R rowType(RowType rowType, P partner, List<R> fieldResults) {
+        return null;
+    }
+
+    public R list(ArrayType list, P partner, R elementResult) {
+        return null;
+    }
+
+    public R map(MapType map, P partner, R keyResult, R valueResult) {
+        return null;
+    }
+
+    public R primitive(DataType primitive, P partner) {
+        return null;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 411ea0d03..e25ba3c2c 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -18,22 +18,12 @@
 
 package org.apache.paimon.hive.mapred;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.hive.PaimonJobConf;
 import org.apache.paimon.hive.RowDataContainer;
-import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -43,7 +33,9 @@ import org.apache.hadoop.mapred.Reporter;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Optional;
+
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
 
 /**
  * {@link InputFormat} for paimon. It divides all files into {@link 
InputSplit}s (one split per
@@ -75,27 +67,6 @@ public class PaimonInputFormat implements InputFormat<Void, 
RowDataContainer> {
                 Arrays.asList(getSelectedColumns(jobConf)));
     }
 
-    private FileStoreTable createFileStoreTable(JobConf jobConf) {
-        PaimonJobConf wrapper = new PaimonJobConf(jobConf);
-        Options options = PaimonJobConf.extractCatalogConfig(jobConf);
-        options.set(CoreOptions.PATH, wrapper.getLocation());
-        CatalogContext catalogContext = CatalogContext.create(options, 
jobConf);
-        return FileStoreTableFactory.create(catalogContext);
-    }
-
-    private Optional<Predicate> createPredicate(TableSchema tableSchema, 
JobConf jobConf) {
-        SearchArgument sarg = ConvertAstToSearchArg.createFromConf(jobConf);
-        if (sarg == null) {
-            return Optional.empty();
-        }
-        SearchArgumentToPredicateConverter converter =
-                new SearchArgumentToPredicateConverter(
-                        sarg,
-                        tableSchema.fieldNames(),
-                        tableSchema.logicalRowType().getFieldTypes());
-        return converter.convert();
-    }
-
     private String[] getSelectedColumns(JobConf jobConf) {
         // when using tez engine or when same table is joined multiple times,
         // it is possible that some selected columns are duplicated
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
new file mode 100644
index 000000000..221447e61
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+
+/** A Paimon table committer for adding data files to the Paimon table. */
+public class PaimonOutputCommitter extends OutputCommitter {
+
+    private static final String PRE_COMMIT = ".preCommit";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonOutputCommitter.class);
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {}
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) throws 
IOException {}
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) 
throws IOException {
+        // We need to commit if this is the last phase of a MapReduce process
+        return TaskType.REDUCE.equals(
+                        
taskAttemptContext.getTaskAttemptID().getTaskID().getTaskType())
+                || taskAttemptContext.getJobConf().getNumReduceTasks() == 0;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+
+        TaskAttemptID attemptID = taskAttemptContext.getTaskAttemptID();
+        JobConf jobConf = taskAttemptContext.getJobConf();
+        FileStoreTable table = createFileStoreTable(jobConf);
+
+        Map<String, PaimonRecordWriter> writers =
+                Optional.ofNullable(PaimonRecordWriter.getWriters(attemptID))
+                        .orElseGet(
+                                () -> {
+                                    LOG.info(
+                                            "CommitTask found no writers for 
output table: {}, attemptID: {}",
+                                            table.name(),
+                                            attemptID);
+                                    return ImmutableMap.of();
+                                });
+        PaimonRecordWriter writer = writers.get(table.name());
+        if (writer != null) {
+
+            try (BatchTableWrite batchTableWrite = writer.batchTableWrite()) {
+                List<CommitMessage> commitTables = 
batchTableWrite.prepareCommit();
+                createPreCommitFile(
+                        commitTables,
+                        generatePreCommitFileLocation(
+                                table.location().getPath(),
+                                attemptID.getJobID(),
+                                attemptID.getTaskID().getId()),
+                        table.fileIO());
+            } catch (Exception e) {
+                LOG.error(
+                        "CommitTask prepareCommit error for specific table: 
{}, attemptID: {}",
+                        table.name(),
+                        attemptID);
+                throw new RuntimeException(e);
+            }
+        } else {
+            LOG.info(
+                    "CommitTask found no writer for specific table: {}, 
attemptID: {}",
+                    table.name(),
+                    attemptID);
+        }
+        PaimonRecordWriter.removeWriters(attemptID);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+        Map<String, PaimonRecordWriter> writers =
+                
PaimonRecordWriter.removeWriters(taskAttemptContext.getTaskAttemptID());
+
+        // close writer and delete files
+        if (writers != null) {
+            for (PaimonRecordWriter writer : writers.values()) {
+                writer.close(true);
+            }
+        }
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+        JobConf jobConf = jobContext.getJobConf();
+
+        long startTime = System.currentTimeMillis();
+        LOG.info("CommitJob {} has started", jobContext.getJobID());
+        FileStoreTable table = createFileStoreTable(jobConf);
+
+        if (table != null) {
+            BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+            List<CommitMessage> commitMessagesList =
+                    getAllPreCommitMessage(table.location().getPath(), 
jobContext, table.fileIO());
+            try (BatchTableCommit batchTableCommit = 
batchWriteBuilder.newCommit()) {
+                batchTableCommit.commit(commitMessagesList);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            deleteTemporaryFile(
+                    jobContext,
+                    generateJobLocation(table.location().getPath(), 
jobContext.getJobID()));
+        } else {
+            LOG.info("CommitJob not found table, Skipping job commit.");
+        }
+
+        LOG.info(
+                "Commit took {} ms for job {}",
+                System.currentTimeMillis() - startTime,
+                jobContext.getJobID());
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, int status) throws IOException 
{
+        FileStoreTable table = createFileStoreTable(jobContext.getJobConf());
+        if (table != null) {
+
+            LOG.info("AbortJob {} has started", jobContext.getJobID());
+            List<CommitMessage> commitMessagesList =
+                    getAllPreCommitMessage(table.location().getPath(), 
jobContext, table.fileIO());
+            BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+            try (BatchTableCommit batchTableCommit = 
batchWriteBuilder.newCommit()) {
+                batchTableCommit.abort(commitMessagesList);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            deleteTemporaryFile(
+                    jobContext,
+                    generateJobLocation(table.location().getPath(), 
jobContext.getJobID()));
+            LOG.info("Job {} is aborted. preCommit file has deleted", 
jobContext.getJobID());
+        }
+    }
+
+    /**
+     * Delete job's temporary locations.
+     *
+     * @param jobContext The job context
+     * @param location The locations to clean up
+     * @throws IOException if there is a failure deleting the files
+     */
+    private void deleteTemporaryFile(JobContext jobContext, String location) 
throws IOException {
+        JobConf jobConf = jobContext.getJobConf();
+
+        LOG.info("Deleting temporary file for job {} started", 
jobContext.getJobID());
+
+        LOG.info("The deleted file is located in : {}", location);
+        try {
+            org.apache.hadoop.fs.Path deleteFilePath = new 
org.apache.hadoop.fs.Path(location);
+            FileSystem fs = deleteFilePath.getFileSystem(jobConf);
+            fs.delete(deleteFilePath, true);
+        } catch (IOException e) {
+            LOG.debug("Failed to delete directory {} ", location, e);
+        }
+        LOG.info("Deleting temporary file for job {} finished", 
jobContext.getJobID());
+    }
+
+    /**
+     * Get the CommitMessages.
+     *
+     * @param location The location of the table
+     * @param jobContext The job context
+     * @param io The FileIO used for reading a files generated for commit
+     * @return The list of the committed data files
+     */
+    private static List<CommitMessage> getAllPreCommitMessage(
+            String location, JobContext jobContext, FileIO io) {
+        JobConf conf = jobContext.getJobConf();
+
+        int totalCommitMessagesSize =
+                conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : 
conf.getNumMapTasks();
+
+        List<CommitMessage> commitMessagesList = 
Collections.synchronizedList(new ArrayList<>());
+
+        for (int i = 0; i < totalCommitMessagesSize; i++) {
+            String commitFileLocation =
+                    generatePreCommitFileLocation(location, 
jobContext.getJobID(), i);
+            commitMessagesList.addAll(readPreCommitFile(commitFileLocation, 
io));
+        }
+
+        return commitMessagesList;
+    }
+
+    /**
+     * Generates the job temp location based on the job configuration.
+     *
+     * @param location The location of the table
+     * @param jobId The JobID for the task
+     * @return The file to store the results
+     */
+    static String generateJobLocation(String location, JobID jobId) {
+        return location + "/temp/" + jobId;
+    }
+
+    /**
+     * Generates preCommit file location based on the configuration and a 
specific task id. In order
+     * to ensure the atomicity of the commit, we need to keep preCommit 
persistent, restore it in
+     * {@link PaimonOutputCommitter#commitJob(JobContext)} to complete the 
final commit, and delete
+     * the temporary files at the end.
+     *
+     * @param location The location of the table
+     * @param jobId jobId
+     * @param taskId taskId
+     * @return The location of preCommit file path
+     */
+    private static String generatePreCommitFileLocation(String location, JobID 
jobId, int taskId) {
+        return generateJobLocation(location, jobId) + "/task_" + taskId + 
PRE_COMMIT;
+    }
+
+    /**
+     * * Create a temp preCommitFile to store {@link 
BatchTableWrite#prepareCommit()}'s
+     * result @Param commitTables The commitMessages of the table preCommit 
@Param location The temp
+     * file's location @Param io The FileIO of the table.
+     */
+    private static void createPreCommitFile(
+            List<CommitMessage> commitTables, String location, FileIO io) 
throws IOException {
+        try (ObjectOutputStream objectOutputStream =
+                new ObjectOutputStream(io.newOutputStream(new Path(location), 
true))) {
+            objectOutputStream.writeObject(commitTables);
+        }
+    }
+
+    private static List<CommitMessage> readPreCommitFile(String location, 
FileIO io) {
+        try (ObjectInputStream objectInputStream =
+                new ObjectInputStream(io.newInputStream(new Path(location)))) {
+            return (List<CommitMessage>) objectInputStream.readObject();
+        } catch (ClassNotFoundException | IOException e) {
+            throw new RuntimeException(
+                    String.format("Can not read or parse CommitMessage file: 
%s", location));
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index 6259fcb71..d40cf3781 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -18,27 +18,70 @@
 
 package org.apache.paimon.hive.mapred;
 
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
 
-/** {@link OutputFormat} for table split. Currently useless. */
-public class PaimonOutputFormat implements OutputFormat<InternalRow, 
InternalRow> {
+import static org.apache.paimon.hive.utils.HiveUtils.createFileStoreTable;
+
+/** {@link OutputFormat} for table split. */
+public class PaimonOutputFormat
+        implements OutputFormat<NullWritable, RowDataContainer>,
+                HiveOutputFormat<NullWritable, RowDataContainer> {
+
+    private static final String TASK_ATTEMPT_ID_KEY = 
"mapreduce.task.attempt.id";
 
     @Override
-    public RecordWriter<InternalRow, InternalRow> getRecordWriter(
+    public RecordWriter<NullWritable, RowDataContainer> getRecordWriter(
             FileSystem fileSystem, JobConf jobConf, String s, Progressable 
progressable)
             throws IOException {
-        throw new UnsupportedOperationException(
-                "Paimon currently can only be used as an input format for 
Hive.");
+        return writer(jobConf);
     }
 
     @Override
     public void checkOutputSpecs(FileSystem fileSystem, JobConf jobConf) 
throws IOException {}
+
+    @Override
+    public FileSinkOperator.RecordWriter getHiveRecordWriter(
+            JobConf jobConf,
+            Path path,
+            Class<? extends Writable> aClass,
+            boolean b,
+            Properties properties,
+            Progressable progressable)
+            throws IOException {
+        return writer(jobConf);
+    }
+
+    private static PaimonRecordWriter writer(JobConf jobConf) {
+        TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jobConf.get(TASK_ATTEMPT_ID_KEY));
+        FileStoreTable table = createFileStoreTable(jobConf);
+        // force write-only = true
+        Map<String, String> newOptions =
+                Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
Boolean.TRUE.toString());
+        FileStoreTable copy = table.copy(newOptions);
+        BatchWriteBuilder batchWriteBuilder = copy.newBatchWriteBuilder();
+        BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+
+        return new PaimonRecordWriter(batchTableWrite, taskAttemptID, 
copy.name());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
new file mode 100644
index 000000000..15a5f78cb
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonRecordWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.table.sink.BatchTableWrite;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+class PaimonRecordWriter
+        implements FileSinkOperator.RecordWriter,
+                org.apache.hadoop.mapred.RecordWriter<NullWritable, 
RowDataContainer> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PaimonRecordWriter.class);
+
+    // Each task generates a PaimonRecordWriter use Map<TaskAttemptID, 
Map<String,
+    // PaimonRecordWriter>>
+    // is used to support multiple table writing for one task in the future
+    private static final Map<TaskAttemptID, Map<String, PaimonRecordWriter>> 
writers =
+            Maps.newConcurrentMap();
+
+    private BatchTableWrite batchTableWrite;
+
+    public PaimonRecordWriter(
+            BatchTableWrite batchTableWrite, TaskAttemptID taskAttemptID, 
String tableName) {
+        this.batchTableWrite = batchTableWrite;
+        writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+        writers.get(taskAttemptID).put(tableName, this);
+    }
+
+    static Map<String, PaimonRecordWriter> removeWriters(TaskAttemptID 
taskAttemptID) {
+        return writers.remove(taskAttemptID);
+    }
+
+    static Map<String, PaimonRecordWriter> getWriters(TaskAttemptID 
taskAttemptID) {
+        return writers.get(taskAttemptID);
+    }
+
+    public void write(Writable row) throws IOException {
+        try {
+            batchTableWrite.write(((RowDataContainer) row).get());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void write(NullWritable key, RowDataContainer value) throws 
IOException {
+        write(value);
+    }
+
+    @Override
+    public void close(boolean abort) {
+        if (abort) {
+            try {
+                batchTableWrite.close();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void close(Reporter reporter) {
+        close(false);
+    }
+
+    public BatchTableWrite batchTableWrite() {
+        return batchTableWrite;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
new file mode 100644
index 000000000..6d453b3fa
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/HivePaimonArray.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.objectinspector;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+
+import java.util.List;
+
+/** HivePaimonArray for Array type. */
+public class HivePaimonArray implements InternalArray {
+
+    private final DataType elementType;
+    private final List<Object> list;
+
+    public HivePaimonArray(DataType elementType, List<Object> list) {
+        this.list = list;
+        this.elementType = elementType;
+    }
+
+    @Override
+    public int size() {
+        return list.size();
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return list.get(i) == null;
+    }
+
+    public List<Object> getList() {
+        return list;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T getAs(int i) {
+        return (T) list.get(i);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public byte getByte(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public short getShort(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public int getInt(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public long getLong(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public float getFloat(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public double getDouble(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public BinaryString getString(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public Decimal getDecimal(int i, int precision, int scale) {
+        return getAs(i);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int i, int precision) {
+        return getAs(i);
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public InternalArray getArray(int i) {
+        return new HivePaimonArray(
+                ((ArrayType) elementType).getElementType(),
+                ((HivePaimonArray) this.getAs(i)).getList());
+    }
+
+    @Override
+    public InternalMap getMap(int i) {
+        return getAs(i);
+    }
+
+    @Override
+    public InternalRow getRow(int i, int i1) {
+        return getAs(i);
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        boolean[] res = new boolean[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getBoolean(i);
+        }
+        return res;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        byte[] res = new byte[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getByte(i);
+        }
+        return res;
+    }
+
+    @Override
+    public short[] toShortArray() {
+        short[] res = new short[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getShort(i);
+        }
+        return res;
+    }
+
+    @Override
+    public int[] toIntArray() {
+        int[] res = new int[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getInt(i);
+        }
+        return res;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long[] res = new long[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getLong(i);
+        }
+        return res;
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        float[] res = new float[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getFloat(i);
+        }
+        return res;
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] res = new double[size()];
+        for (int i = 0; i < size(); i++) {
+            res[i] = getDouble(i);
+        }
+        return res;
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
index 97ef6bbad..13c2009f9 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonCharObjectInspector.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for CHAR type. */
 public class PaimonCharObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements HiveCharObjectInspector {
+        implements HiveCharObjectInspector, WriteableObjectInspector {
 
     private final int len;
 
@@ -59,4 +59,9 @@ public class PaimonCharObjectInspector extends 
AbstractPrimitiveJavaObjectInspec
             return o;
         }
     }
+
+    @Override
+    public BinaryString convert(Object value) {
+        return value == null ? null : 
BinaryString.fromString(value.toString());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
index f11ef4990..a8eee30f7 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
@@ -26,10 +26,11 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspect
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 import java.sql.Date;
+import java.time.LocalDate;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for DATE type. */
 public class PaimonDateObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements DateObjectInspector {
+        implements DateObjectInspector, WriteableObjectInspector {
 
     public PaimonDateObjectInspector() {
         super(TypeInfoFactory.dateTypeInfo);
@@ -57,4 +58,16 @@ public class PaimonDateObjectInspector extends 
AbstractPrimitiveJavaObjectInspec
             return o;
         }
     }
+
+    @Override
+    public Integer convert(Object value) {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof Date) {
+            return DateTimeUtils.toInternal((Date) value);
+        } else {
+            return DateTimeUtils.toInternal((LocalDate) value);
+        }
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
index 83e876c7b..e01c5cd86 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
@@ -26,9 +26,11 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
+import java.math.BigDecimal;
+
 /** {@link AbstractPrimitiveJavaObjectInspector} for DECIMAL type. */
 public class PaimonDecimalObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements HiveDecimalObjectInspector {
+        implements HiveDecimalObjectInspector, WriteableObjectInspector {
 
     public PaimonDecimalObjectInspector(int precision, int scale) {
         super(TypeInfoFactory.getDecimalTypeInfo(precision, scale));
@@ -56,4 +58,17 @@ public class PaimonDecimalObjectInspector extends 
AbstractPrimitiveJavaObjectIns
             return o;
         }
     }
+
+    @Override
+    public Decimal convert(Object o) {
+        if (o == null) {
+            return null;
+        }
+
+        BigDecimal result = ((HiveDecimal) o).bigDecimalValue();
+        // during the HiveDecimal to BigDecimal conversion the scale is lost, 
when the value is 0
+        result = result.setScale(scale());
+
+        return Decimal.fromBigDecimal(result, result.precision(), 
result.scale());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
index 8dd5bcb85..774291688 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java
@@ -21,15 +21,20 @@ package org.apache.paimon.hive.objectinspector;
 import org.apache.paimon.hive.HiveTypeUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 /** Factory to create {@link ObjectInspector}s according to the given {@link 
DataType}. */
 public class PaimonObjectInspectorFactory {
 
@@ -70,6 +75,15 @@ public class PaimonObjectInspectorFactory {
             case MAP:
                 MapType mapType = (MapType) logicalType;
                 return new PaimonMapObjectInspector(mapType.getKeyType(), 
mapType.getValueType());
+            case ROW:
+                List<String> fieldComments =
+                        ((RowType) logicalType)
+                                .getFields().stream()
+                                        .map(DataField::description)
+                                        .collect(Collectors.toList());
+                List<DataType> fieldTypes = ((RowType) 
logicalType).getFieldTypes();
+                List<String> fieldNames = ((RowType) 
logicalType).getFieldNames();
+                return new PaimonInternalRowObjectInspector(fieldNames, 
fieldTypes, fieldComments);
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported logical type " + 
logicalType.asSQLString());
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
index 8917bcafd..9679aebd3 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonStringObjectInspector.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.Text;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for STRING type. */
 public class PaimonStringObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements StringObjectInspector {
+        implements StringObjectInspector, WriteableObjectInspector {
 
     public PaimonStringObjectInspector() {
         super(TypeInfoFactory.stringTypeInfo);
@@ -52,4 +52,9 @@ public class PaimonStringObjectInspector extends 
AbstractPrimitiveJavaObjectInsp
             return o;
         }
     }
+
+    @Override
+    public BinaryString convert(Object value) {
+        return value == null ? null : 
BinaryString.fromString(value.toString());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
index 0c47475e5..7b3ae41d7 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
@@ -25,9 +25,11 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitive
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
+import java.time.LocalDateTime;
+
 /** {@link AbstractPrimitiveJavaObjectInspector} for TIMESTAMP type. */
 public class PaimonTimestampObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements TimestampObjectInspector {
+        implements TimestampObjectInspector, WriteableObjectInspector {
 
     public PaimonTimestampObjectInspector() {
         super(TypeInfoFactory.timestampTypeInfo);
@@ -56,4 +58,16 @@ public class PaimonTimestampObjectInspector extends 
AbstractPrimitiveJavaObjectI
             return o;
         }
     }
+
+    @Override
+    public Timestamp convert(Object value) {
+        if (value == null) {
+            return null;
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return Timestamp.fromSQLTimestamp((java.sql.Timestamp) value);
+        } else {
+            return Timestamp.fromLocalDateTime((LocalDateTime) value);
+        }
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
index 03ce70c8b..8a6a31c3e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonVarcharObjectInspector.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /** {@link AbstractPrimitiveJavaObjectInspector} for VARCHAR type. */
 public class PaimonVarcharObjectInspector extends 
AbstractPrimitiveJavaObjectInspector
-        implements HiveVarcharObjectInspector {
+        implements HiveVarcharObjectInspector, WriteableObjectInspector {
 
     private final int len;
 
@@ -59,4 +59,9 @@ public class PaimonVarcharObjectInspector extends 
AbstractPrimitiveJavaObjectIns
             return o;
         }
     }
+
+    @Override
+    public BinaryString convert(Object value) {
+        return value == null ? null : 
BinaryString.fromString(value.toString());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
new file mode 100644
index 000000000..2e05dbaed
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/WriteableObjectInspector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.paimon.hive.objectinspector;
+
+/** Interface for convert the hive primitive type to paimon dataType. */
+public interface WriteableObjectInspector {
+    Object convert(Object value);
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
new file mode 100644
index 000000000..c2630bfc1
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.hive.PaimonJobConf;
+import org.apache.paimon.hive.SearchArgumentToPredicateConverter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Utils for create {@link FileStoreTable} and {@link Predicate}. */
+public class HiveUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
+
+    public static FileStoreTable createFileStoreTable(JobConf jobConf) {
+        PaimonJobConf wrapper = new PaimonJobConf(jobConf);
+        Options options = PaimonJobConf.extractCatalogConfig(jobConf);
+        options.set(CoreOptions.PATH, wrapper.getLocation());
+        CatalogContext catalogContext = CatalogContext.create(options, 
jobConf);
+        return FileStoreTableFactory.create(catalogContext);
+    }
+
+    public static Optional<Predicate> createPredicate(TableSchema tableSchema, 
JobConf jobConf) {
+        SearchArgument sarg = ConvertAstToSearchArg.createFromConf(jobConf);
+        if (sarg == null) {
+            return Optional.empty();
+        }
+        SearchArgumentToPredicateConverter converter =
+                new SearchArgumentToPredicateConverter(
+                        sarg,
+                        tableSchema.fieldNames(),
+                        tableSchema.logicalRowType().getFieldTypes());
+        return converter.convert();
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
index 12dce9978..016aced0e 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/FileStoreTestUtils.java
@@ -33,15 +33,26 @@ import java.util.List;
 /** Test utils related to {@link FileStore}. */
 public class FileStoreTestUtils {
 
-    private static final String TABLE_NAME = "hive_test_table";
+    public static final String TABLE_NAME = "hive_test_table";
 
-    private static final String DATABASE_NAME = "default";
+    public static final String DATABASE_NAME = "default";
 
     private static final Identifier TABLE_IDENTIFIER = 
Identifier.create(DATABASE_NAME, TABLE_NAME);
 
     public static Table createFileStoreTable(
             Options conf, RowType rowType, List<String> partitionKeys, 
List<String> primaryKeys)
             throws Exception {
+        return createFileStoreTable(conf, rowType, partitionKeys, primaryKeys, 
null);
+    }
+
+    public static Table createFileStoreTable(
+            Options conf,
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            Identifier identifier)
+            throws Exception {
+        Identifier identifierNotNull = identifier == null ? TABLE_IDENTIFIER : 
identifier;
         // create CatalogContext using the options
         CatalogContext catalogContext = CatalogContext.create(conf);
         Catalog catalog = CatalogFactory.createCatalog(catalogContext);
@@ -49,10 +60,10 @@ public class FileStoreTestUtils {
         catalog.createDatabase(DATABASE_NAME, false);
         // create table
         catalog.createTable(
-                TABLE_IDENTIFIER,
+                identifierNotNull,
                 new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
conf.toMap(), ""),
                 false);
-        Table table = catalog.getTable(TABLE_IDENTIFIER);
+        Table table = catalog.getTable(identifierNotNull);
         catalog.close();
         return table;
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
new file mode 100644
index 000000000..69bc691dc
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -0,0 +1,1126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.WriteMode;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.hive.mapred.PaimonOutputFormat;
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.hive.FileStoreTestUtils.DATABASE_NAME;
+import static org.apache.paimon.hive.FileStoreTestUtils.TABLE_NAME;
+import static 
org.apache.paimon.hive.RandomGenericRowDataGenerator.randomBigDecimal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link PaimonStorageHandler} and {@link PaimonOutputFormat}. 
*/
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public class HiveWriteITCase {
+
+    @ClassRule public static TemporaryFolder folder = new TemporaryFolder();
+
+    @HiveSQL(files = {})
+    private static HiveShell hiveShell;
+
+    private static String engine;
+
+    private String commitUser;
+    private long commitIdentifier;
+
+    @BeforeClass
+    public static void beforeClass() {
+        // only run with mr
+        engine = "mr";
+    }
+
+    @Before
+    public void before() {
+        hiveShell.execute("SET hive.execution.engine=mr");
+
+        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+        hiveShell.execute("USE test_db");
+
+        commitUser = UUID.randomUUID().toString();
+        commitIdentifier = 0;
+    }
+
+    @After
+    public void after() {
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+    }
+
+    private String createChangelogExternalTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<InternalRow> data)
+            throws Exception {
+
+        return createChangelogExternalTable(rowType, partitionKeys, 
primaryKeys, data, "");
+    }
+
+    private String createChangelogExternalTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<InternalRow> data,
+            String tableName)
+            throws Exception {
+        String path = folder.newFolder().toURI().toString();
+        String tableNameNotNull =
+                StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME : 
tableName;
+        String tablePath = String.format("%s/default.db/%s", path, 
tableNameNotNull);
+        Options conf = new Options();
+        conf.set(CatalogOptions.WAREHOUSE, path);
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        Identifier identifier = Identifier.create(DATABASE_NAME, 
tableNameNotNull);
+        Table table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf, rowType, partitionKeys, primaryKeys, identifier);
+
+        return writeData(table, tablePath, data);
+    }
+
+    private String createAppendOnlyExternalTable(
+            RowType rowType, List<String> partitionKeys, List<InternalRow> 
data) throws Exception {
+        return createAppendOnlyExternalTable(rowType, partitionKeys, data, "");
+    }
+
+    private String createAppendOnlyExternalTable(
+            RowType rowType, List<String> partitionKeys, List<InternalRow> 
data, String tableName)
+            throws Exception {
+        String path = folder.newFolder().toURI().toString();
+        String tableNameNotNull =
+                StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME : 
tableName;
+        String tablePath = String.format("%s/default.db/%s", path, 
tableNameNotNull);
+        Options conf = new Options();
+        conf.set(CatalogOptions.WAREHOUSE, path);
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        Identifier identifier = Identifier.create(DATABASE_NAME, 
tableNameNotNull);
+        Table table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf, rowType, partitionKeys, Collections.emptyList(), 
identifier);
+
+        return writeData(table, tablePath, data);
+    }
+
+    private String writeData(Table table, String path, List<InternalRow> data) 
throws Exception {
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
+        for (InternalRow rowData : data) {
+            write.write(rowData);
+            if (ThreadLocalRandom.current().nextInt(5) == 0) {
+                commit.commit(commitIdentifier, write.prepareCommit(false, 
commitIdentifier));
+                commitIdentifier++;
+            }
+        }
+        commit.commit(commitIdentifier, write.prepareCommit(true, 
commitIdentifier));
+        commitIdentifier++;
+        write.close();
+
+        String tableName = "test_table_" + 
(UUID.randomUUID().toString().substring(0, 4));
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE EXTERNAL TABLE " + tableName + " ",
+                                "STORED BY '" + 
PaimonStorageHandler.class.getName() + "'",
+                                "LOCATION '" + path + "'")));
+        return tableName;
+    }
+
+    @Test
+    public void testInsert() throws Exception {
+        List<InternalRow> emptyData = Collections.emptyList();
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute(
+                "insert into " + outputTableName + " values 
(1,2,3,'Hello'),(4,5,6,'Fine')");
+        List<String> select = hiveShell.executeQuery("select * from " + 
outputTableName);
+        Assert.assertEquals(select, Arrays.asList("1\t2\t3\tHello", 
"4\t5\t6\tFine"));
+    }
+
+    @Test
+    public void testWriteOnlyWithChangeLogTableOption() throws Exception {
+
+        String innerName = "hive_test_table_output";
+
+        String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/%s", path, innerName);
+        Options conf = new Options();
+        conf.set(CatalogOptions.WAREHOUSE, path);
+        conf.set(CoreOptions.BUCKET, 1);
+        conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        Identifier identifier = Identifier.create(DATABASE_NAME, innerName);
+        Table table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf,
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.STRING(),
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("a", "pt"),
+                        identifier);
+        String tableName = "test_table_" + 
(UUID.randomUUID().toString().substring(0, 4));
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE EXTERNAL TABLE " + tableName + " ",
+                                "STORED BY '" + 
PaimonStorageHandler.class.getName() + "'",
+                                "LOCATION '" + tablePath + "'")));
+        for (int i = 0; i < 5; i++) {
+            hiveShell.execute(
+                    "insert into " + tableName + " values 
(1,2,3,'Hello'),(4,5,6,'Fine')");
+        }
+        TableScan scan = table.newReadBuilder().newStreamScan();
+        DataSplit split = (DataSplit) scan.plan().splits().get(0);
+        // no compact snapshot
+        Assert.assertEquals(split.snapshotId(), 5L);
+    }
+
+    @Test
+    public void testWriteOnlyWithAppendOnlyTableOption() throws Exception {
+
+        String innerName = "hive_test_table_output";
+        int maxCompact = 3;
+        String path = folder.newFolder().toURI().toString();
+        String tablePath = String.format("%s/default.db/%s", path, innerName);
+        Options conf = new Options();
+        conf.set(CatalogOptions.WAREHOUSE, path);
+        conf.set(CoreOptions.BUCKET, 1);
+        conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        conf.set(CoreOptions.COMPACTION_MAX_FILE_NUM, maxCompact);
+        Identifier identifier = Identifier.create(DATABASE_NAME, innerName);
+        Table table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf,
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.STRING(),
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Collections.emptyList(),
+                        identifier);
+        String tableName = "test_table_" + 
(UUID.randomUUID().toString().substring(0, 4));
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE EXTERNAL TABLE " + tableName + " ",
+                                "STORED BY '" + 
PaimonStorageHandler.class.getName() + "'",
+                                "LOCATION '" + tablePath + "'")));
+        for (int i = 0; i < maxCompact; i++) {
+            hiveShell.execute(
+                    "insert into " + tableName + " values 
(1,2,3,'Hello'),(4,5,6,'Fine')");
+        }
+        TableScan scan = table.newReadBuilder().newStreamScan();
+        DataSplit split = (DataSplit) scan.plan().splits().get(0);
+        // no compact snapshot
+        Assert.assertEquals(split.snapshotId(), maxCompact);
+    }
+
+    @Test
+    public void testInsertFromSelectWithPartitionWithPk() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(1, 10, 100L, 
BinaryString.fromString("Hi")),
+                        GenericRow.of(2, 10, 200L, 
BinaryString.fromString("Hello")),
+                        GenericRow.of(1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi 
Again")),
+                        GenericRow.ofKind(
+                                RowKind.DELETE, 1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(2, 20, 100L, null),
+                        GenericRow.of(1, 30, 200L, 
BinaryString.fromString("Store")));
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertFromSelectNoPartitionWithPk() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                10L,
+                                BinaryString.fromString("Hi"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)),
+                        GenericRow.of(
+                                1,
+                                20L,
+                                BinaryString.fromString("Hello"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)),
+                        GenericRow.of(
+                                2,
+                                30L,
+                                BinaryString.fromString("World"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)),
+                        GenericRow.of(
+                                1,
+                                10L,
+                                BinaryString.fromString("Hi Again"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)),
+                        GenericRow.ofKind(
+                                RowKind.DELETE,
+                                2,
+                                30L,
+                                BinaryString.fromString("World"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)),
+                        GenericRow.of(
+                                2, 40L, null, 
Decimal.fromBigDecimal(randomBigDecimal(5, 3), 5, 3)),
+                        GenericRow.of(
+                                3,
+                                50L,
+                                BinaryString.fromString("Store"),
+                                Decimal.fromBigDecimal(randomBigDecimal(5, 3), 
5, 3)));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING(),
+                                    DataTypes.DECIMAL(5, 3)
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a", "b"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING(),
+                                    DataTypes.DECIMAL(5, 3)
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertFromSelectWhereWithPartitionWithPk() throws 
Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(1, 10, 100L, 
BinaryString.fromString("Hi")),
+                        GenericRow.of(2, 10, 200L, 
BinaryString.fromString("Hello")),
+                        GenericRow.of(1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi 
Again")),
+                        GenericRow.ofKind(
+                                RowKind.DELETE, 1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(2, 20, 100L, null),
+                        GenericRow.of(1, 30, 200L, 
BinaryString.fromString("Store")));
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute(
+                "insert into " + outputTableName + " SELECT * FROM " + 
tableName + " where a > 10");
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect =
+                hiveShell.executeStatement("select * from " + tableName + " 
where a > 10");
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertFromSelectOrderWithPartitionWithPk() throws 
Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(1, 10, 100L, 
BinaryString.fromString("Hi")),
+                        GenericRow.of(2, 10, 200L, 
BinaryString.fromString("Hello")),
+                        GenericRow.of(1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi 
Again")),
+                        GenericRow.ofKind(
+                                RowKind.DELETE, 1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(2, 20, 100L, null),
+                        GenericRow.of(1, 30, 200L, 
BinaryString.fromString("Store")));
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "a"),
+                        data);
+
+        String outputTableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "b"),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute(
+                "insert into "
+                        + outputTableName
+                        + " SELECT * FROM "
+                        + tableName
+                        + " order by b desc");
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect =
+                hiveShell.executeStatement("select * from " + tableName + " 
order by b desc");
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertFromJoiningWithPartitionWithPk() throws Exception {
+
+        List<InternalRow> leftData =
+                Arrays.asList(
+                        GenericRow.of(1, 10, 100L, 
BinaryString.fromString("Hi")),
+                        GenericRow.of(2, 10, 200L, 
BinaryString.fromString("Hello")),
+                        GenericRow.of(1, 20, 300L, 
BinaryString.fromString("World")),
+                        GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi 
Again")));
+        List<InternalRow> rightData =
+                Arrays.asList(
+                        GenericRow.of(1, 10, 1L, 
BinaryString.fromString("HZY")),
+                        GenericRow.of(2, 10, 2L, 
BinaryString.fromString("LN")),
+                        GenericRow.of(1, 20, 3L, 
BinaryString.fromString("GOOD")),
+                        GenericRow.of(1, 10, 4L, BinaryString.fromString("")));
+        List<InternalRow> emptyData = Collections.emptyList();
+        String leftTable =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        leftData);
+        String rightTable =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        rightData);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.INT(),
+                                    DataTypes.BIGINT(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute(
+                "insert into "
+                        + outputTableName
+                        + " SELECT r.pt as pt,l.a as a,l.b as b ,r.c as c FROM 
"
+                        + leftTable
+                        + " l left join "
+                        + rightTable
+                        + " r on l.a = r.a");
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect =
+                hiveShell.executeStatement(
+                        " SELECT r.pt as pt,l.a as a,l.b as b ,r.c as c FROM "
+                                + leftTable
+                                + " l left join "
+                                + rightTable
+                                + " r on l.a = r.a");
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertAllSupportedTypes() throws Exception {
+
+        String root = folder.newFolder().toString();
+        String tablePath = String.format("%s/default.db/hive_test_table", 
root);
+        Options conf = new Options();
+        conf.set(CatalogOptions.WAREHOUSE, root);
+        conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        Table table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf,
+                        RandomGenericRowDataGenerator.ROW_TYPE,
+                        Collections.emptyList(),
+                        Collections.singletonList("f_int"));
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        List<GenericRow> input = new ArrayList<>();
+        for (int i = random.nextInt(10); i > 0; i--) {
+            while (true) {
+                // pk must not be null
+                GenericRow rowData = RandomGenericRowDataGenerator.generate();
+                if (!rowData.isNullAt(3)) {
+                    input.add(rowData);
+                    break;
+                }
+            }
+        }
+
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
+        for (GenericRow rowData : input) {
+            write.write(rowData);
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE EXTERNAL TABLE test_table",
+                                "STORED BY '" + 
PaimonStorageHandler.class.getName() + "'",
+                                "LOCATION '" + tablePath + "'")));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String outputTableName =
+                createChangelogExternalTable(
+                        RandomGenericRowDataGenerator.ROW_TYPE,
+                        Collections.emptyList(),
+                        Collections.singletonList("f_int"),
+                        emptyData,
+                        "hive_test_table_output");
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM 
test_table");
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from 
test_table");
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertArrayOfPrimitiveType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        
BinaryString.fromString("xiaoyang"))
+                                                .toArray()),
+                                new GenericArray(
+                                        
Collections.singletonList(BinaryString.fromString("hi"))
+                                                .toArray()),
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        Decimal.fromBigDecimal(
+                                                                
randomBigDecimal(5, 3), 5, 3))
+                                                .toArray())),
+                        GenericRow.of(
+                                1,
+                                new GenericArray(
+                                        
Collections.singletonList(BinaryString.fromString("hzy"))
+                                                .toArray()),
+                                new GenericArray(
+                                        
Collections.singletonList(BinaryString.fromString("hello"))
+                                                .toArray()),
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        Decimal.fromBigDecimal(
+                                                                
randomBigDecimal(5, 3), 5, 3))
+                                                .toArray())));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(DataTypes.CHAR(20)),
+                                    DataTypes.ARRAY(DataTypes.VARCHAR(100)),
+                                    DataTypes.ARRAY(DataTypes.DECIMAL(5, 3))
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(DataTypes.CHAR(20)),
+                                    DataTypes.ARRAY(DataTypes.VARCHAR(100)),
+                                    DataTypes.ARRAY(DataTypes.DECIMAL(5, 3))
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertArrayOfArrayType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        new GenericArray(
+                                                                
Collections.singletonList(
+                                                                               
 BinaryString
+                                                                               
         .fromString(
+                                                                               
                 "xiaoyang"))
+                                                                        
.toArray()))
+                                                .toArray()),
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        new GenericArray(
+                                                                
Collections.singletonList(
+                                                                               
 new GenericArray(
+                                                                               
         Collections
+                                                                               
                 .singletonList(
+                                                                               
                         BinaryString
+                                                                               
                                 .fromString(
+                                                                               
                                         "xiaoyang"))
+                                                                               
                 .toArray()))
+                                                                        
.toArray()))
+                                                .toArray())));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())),
+                                    DataTypes.ARRAY(
+                                            
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b", "c"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())),
+                                    DataTypes.ARRAY(
+                                            
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b", "c"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertArrayOfMapType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        new GenericMap(
+                                                                
Collections.singletonMap(
+                                                                        
BinaryString.fromString(
+                                                                               
 "xiaoyang"),
+                                                                        
BinaryString.fromString(
+                                                                               
 "xiaolan"))))
+                                                .toArray())));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(
+                                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(
+                                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertArrayOfRowType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericArray(
+                                        Collections.singletonList(
+                                                        GenericRow.of(
+                                                                GenericRow.of(
+                                                                        
BinaryString.fromString(
+                                                                               
 "xiaoyang")),
+                                                                
BinaryString.fromString("xiaolan")))
+                                                .toArray())));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(
+                                            DataTypes.ROW(
+                                                    new DataField(
+                                                            2,
+                                                            "b1",
+                                                            DataTypes.ROW(
+                                                                    new 
DataField(
+                                                                            4,
+                                                                            
"b1_1",
+                                                                            
DataTypes.STRING()))),
+                                                    new DataField(3, "b2", 
DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.ARRAY(
+                                            DataTypes.ROW(
+                                                    new DataField(
+                                                            2,
+                                                            "b1",
+                                                            DataTypes.ROW(
+                                                                    new 
DataField(
+                                                                            4,
+                                                                            
"b1_1",
+                                                                            
DataTypes.STRING()))),
+                                                    new DataField(3, "b2", 
DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertMapOfPrimitiveType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericMap(
+                                        Collections.singletonMap(
+                                                
BinaryString.fromString("xiaoyang"),
+                                                
BinaryString.fromString("xiaolan")))));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING())
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING())
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertMapOfArrayType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericMap(
+                                        Collections.singletonMap(
+                                                
BinaryString.fromString("xiaoyang"),
+                                                new GenericArray(
+                                                        
Collections.singletonList(
+                                                                        
BinaryString.fromString(
+                                                                               
 "xiaoyang"))
+                                                                
.toArray())))));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertMapOfMapType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericMap(
+                                        Collections.singletonMap(
+                                                
BinaryString.fromString("xiaolan"),
+                                                new GenericMap(
+                                                        
Collections.singletonMap(
+                                                                
BinaryString.fromString("xiaoyang"),
+                                                                
BinaryString.fromString(
+                                                                        
"xiaolan")))))));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(),
+                                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(),
+                                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+
+    @Test
+    public void testInsertMapOfRowType() throws Exception {
+
+        List<InternalRow> data =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                new GenericMap(
+                                        Collections.singletonMap(
+                                                
BinaryString.fromString("xiaolan"),
+                                                GenericRow.of(
+                                                        GenericRow.of(
+                                                                
BinaryString.fromString(
+                                                                        
"xiaoyang")),
+                                                        
BinaryString.fromString("xiaolan"))))));
+
+        List<InternalRow> emptyData = Collections.emptyList();
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(),
+                                            DataTypes.ROW(
+                                                    new DataField(
+                                                            5,
+                                                            "c1",
+                                                            DataTypes.ROW(
+                                                                    new 
DataField(
+                                                                            6,
+                                                                            
"c1_1",
+                                                                            
DataTypes.STRING()))),
+                                                    new DataField(7, "c2", 
DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a"),
+                        data);
+
+        String outputTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.MAP(
+                                            DataTypes.STRING(),
+                                            DataTypes.ROW(
+                                                    new DataField(
+                                                            5,
+                                                            "c1",
+                                                            DataTypes.ROW(
+                                                                    new 
DataField(
+                                                                            6,
+                                                                            
"c1_1",
+                                                                            
DataTypes.STRING()))),
+                                                    new DataField(7, "c2", 
DataTypes.STRING())))
+                                },
+                                new String[] {"a", "b"}),
+                        Collections.emptyList(),
+                        emptyData,
+                        "hive_test_table_output");
+
+        hiveShell.execute("insert into " + outputTableName + " SELECT * FROM " 
+ tableName);
+        List<Object[]> select = hiveShell.executeStatement("select * from " + 
outputTableName);
+        List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
+        
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 760388d31..83bb74d89 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -20,6 +20,7 @@ package org.apache.paimon.hive;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.WriteMode;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -37,6 +38,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
@@ -64,6 +66,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.paimon.hive.FileStoreTestUtils.DATABASE_NAME;
+import static org.apache.paimon.hive.FileStoreTestUtils.TABLE_NAME;
+
 /** IT cases for {@link PaimonStorageHandler} and {@link PaimonInputFormat}. */
 @RunWith(PaimonEmbeddedHiveRunner.class)
 public class PaimonStorageHandlerITCase {
@@ -82,7 +87,8 @@ public class PaimonStorageHandlerITCase {
     public static void beforeClass() {
         // TODO Currently FlinkEmbeddedHiveRunner can only be used for one 
test class,
         //  so we have to select engine randomly. Write our own Hive tester in 
the future.
-        engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+        // engine = ThreadLocalRandom.current().nextBoolean() ? "mr" : "tez";
+        engine = "mr";
     }
 
     @Before
@@ -602,30 +608,54 @@ public class PaimonStorageHandlerITCase {
             List<String> primaryKeys,
             List<InternalRow> data)
             throws Exception {
+
+        return createChangelogExternalTable(rowType, partitionKeys, 
primaryKeys, data, "");
+    }
+
+    private String createChangelogExternalTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<InternalRow> data,
+            String tableName)
+            throws Exception {
         String path = folder.newFolder().toURI().toString();
-        String tablePath = String.format("%s/default.db/hive_test_table", 
path);
+        String tableNameNotNull =
+                StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME : 
tableName;
+        String tablePath = String.format("%s/default.db/%s", path, 
tableNameNotNull);
         Options conf = new Options();
         conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+        Identifier identifier = Identifier.create(DATABASE_NAME, 
tableNameNotNull);
         Table table =
-                FileStoreTestUtils.createFileStoreTable(conf, rowType, 
partitionKeys, primaryKeys);
+                FileStoreTestUtils.createFileStoreTable(
+                        conf, rowType, partitionKeys, primaryKeys, identifier);
 
         return writeData(table, tablePath, data);
     }
 
     private String createAppendOnlyExternalTable(
             RowType rowType, List<String> partitionKeys, List<InternalRow> 
data) throws Exception {
+        return createAppendOnlyExternalTable(rowType, partitionKeys, data, "");
+    }
+
+    private String createAppendOnlyExternalTable(
+            RowType rowType, List<String> partitionKeys, List<InternalRow> 
data, String tableName)
+            throws Exception {
         String path = folder.newFolder().toURI().toString();
-        String tablePath = String.format("%s/default.db/hive_test_table", 
path);
+        String tableNameNotNull =
+                StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME : 
tableName;
+        String tablePath = String.format("%s/default.db/%s", path, 
tableNameNotNull);
         Options conf = new Options();
         conf.set(CatalogOptions.WAREHOUSE, path);
         conf.set(CoreOptions.BUCKET, 2);
         conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
         conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        Identifier identifier = Identifier.create(DATABASE_NAME, 
tableNameNotNull);
         Table table =
                 FileStoreTestUtils.createFileStoreTable(
-                        conf, rowType, partitionKeys, Collections.emptyList());
+                        conf, rowType, partitionKeys, Collections.emptyList(), 
identifier);
 
         return writeData(table, tablePath, data);
     }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
index e9ab13bfe..74d4c92c0 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/RandomGenericRowDataGenerator.java
@@ -133,6 +133,7 @@ public class RandomGenericRowDataGenerator {
                                                     FIELD_COMMENTS.get(i)))
                             .collect(Collectors.toList()));
 
+    // todo generate with schema
     public static GenericRow generate() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         byte[] randomBytes = new byte[random.nextInt(20)];
@@ -185,11 +186,13 @@ public class RandomGenericRowDataGenerator {
         return builder.toString();
     }
 
-    private static BigDecimal randomBigDecimal(int precision, int scale) {
+    public static BigDecimal randomBigDecimal(int precision, int scale) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         StringBuilder builder = new StringBuilder();
+        // to avoid starting with  0
         for (int i = 0; i < precision - scale; i++) {
-            builder.append((char) (random.nextInt(10) + '0'));
+            int t = random.nextInt(10);
+            builder.append(t == 0 ? 1 : t);
         }
         builder.append('.');
         for (int i = 0; i < scale; i++) {


Reply via email to