This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b4f24ff39 [hive] Fix insert into static partitions on managed Paimon 
tables (#7824)
2b4f24ff39 is described below

commit 2b4f24ff39432da46e28884b423a74d29e8cf7b5
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jun 4 11:26:16 2026 +0530

    [hive] Fix insert into static partitions on managed Paimon tables (#7824)
---
 .../paimon/hive/mapred/PaimonOutputFormat.java     | 124 ++++++++++++++-
 .../hive/mapred/PartitionedRecordWriter.java       |  92 +++++++++++
 .../org/apache/paimon/hive/HiveWriteITCase.java    |  69 +++++++++
 .../paimon/hive/mapred/PaimonOutputFormatTest.java | 169 +++++++++++++++++++++
 4 files changed, 447 insertions(+), 7 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index ef1ee687ec..9254ba91f6 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -19,10 +19,15 @@
 package org.apache.paimon.hive.mapred;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +42,10 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -69,20 +77,122 @@ public class PaimonOutputFormat
             Properties properties,
             Progressable progressable)
             throws IOException {
-        return writer(jobConf);
+        FileStoreTable table = forWriteOnly(createFileStoreTable(jobConf));
+        PaimonRecordWriter inner = writer(jobConf, table);
+
+        GenericRow staticPartitionRow = buildStaticPartitionRow(path, 
table.schema());
+        return staticPartitionRow == null
+                ? inner
+                : new PartitionedRecordWriter(
+                        inner, staticPartitionRow, 
table.schema().fields().size());
     }
 
     private static PaimonRecordWriter writer(JobConf jobConf) {
+        return writer(jobConf, forWriteOnly(createFileStoreTable(jobConf)));
+    }
+
+    private static PaimonRecordWriter writer(JobConf jobConf, FileStoreTable 
table) {
         TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jobConf);
+        BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+        return new PaimonRecordWriter(batchTableWrite, taskAttemptID, 
table.name());
+    }
 
-        FileStoreTable table = createFileStoreTable(jobConf);
-        // force write-only = true
+    private static FileStoreTable forWriteOnly(FileStoreTable table) {
         Map<String, String> newOptions =
                 Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), 
Boolean.TRUE.toString());
-        FileStoreTable copy = table.copy(newOptions);
-        BatchWriteBuilder batchWriteBuilder = copy.newBatchWriteBuilder();
-        BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+        return table.copy(newOptions);
+    }
+
+    static GenericRow buildStaticPartitionRow(Path path, TableSchema schema) {
+        List<String> partitionKeys = schema.partitionKeys();
+        if (partitionKeys.isEmpty()) {
+            return null;
+        }
+
+        LinkedHashMap<String, String> spec =
+                PartitionPathUtils.extractPartitionSpecFromPath(
+                        new org.apache.paimon.fs.Path(path.toString()));
+        if (spec.isEmpty()) {
+            return null;
+        }
+
+        assertPartitionKeysAreSchemaTail(schema.fields(), partitionKeys);
+
+        GenericRow row = new GenericRow(partitionKeys.size());
+        List<DataField> fields = schema.fields();
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            String key = partitionKeys.get(i);
+            String raw = lookupCaseInsensitive(spec, key);
+            if (raw == null) {
+                throw new IllegalArgumentException(
+                        "Mixed static and dynamic partition writes are not 
supported for managed "
+                                + "Paimon Hive tables. Partition key '"
+                                + key
+                                + "' has no value in the static partition path 
"
+                                + path
+                                + ". Provide values for every partition key in 
the INSERT.");
+            }
+            DataField field = findField(fields, key);
+            row.setField(i, TypeUtils.castFromString(raw, field.type()));
+        }
+        return row;
+    }
+
+    private static void assertPartitionKeysAreSchemaTail(
+            List<DataField> fields, List<String> partitionKeys) {
+        int n = partitionKeys.size();
+        int start = fields.size() - n;
+        if (start < 0) {
+            throw new IllegalArgumentException(
+                    "Table schema has "
+                            + fields.size()
+                            + " columns but the schema declares "
+                            + n
+                            + " partition keys. Static partition write 
requires partition keys "
+                            + "to be a trailing slice of the schema.");
+        }
+        for (int i = 0; i < n; i++) {
+            String expected = partitionKeys.get(i);
+            String actual = fields.get(start + i).name();
+            if (!actual.equalsIgnoreCase(expected)) {
+                List<String> schemaNames = new ArrayList<>(fields.size());
+                for (DataField f : fields) {
+                    schemaNames.add(f.name());
+                }
+                throw new IllegalArgumentException(
+                        "Static partition write requires partition keys to be 
the trailing "
+                                + "columns of the schema in declared order. 
Expected column at "
+                                + "position "
+                                + (start + i)
+                                + " to be '"
+                                + expected
+                                + "', found '"
+                                + actual
+                                + "'. Schema: "
+                                + schemaNames
+                                + ", partition keys: "
+                                + partitionKeys
+                                + ".");
+            }
+        }
+    }
+
+    private static String lookupCaseInsensitive(Map<String, String> spec, 
String key) {
+        for (Map.Entry<String, String> e : spec.entrySet()) {
+            if (e.getKey().equalsIgnoreCase(key)) {
+                return e.getValue();
+            }
+        }
+        return null;
+    }
 
-        return new PaimonRecordWriter(batchTableWrite, taskAttemptID, 
copy.name());
+    private static DataField findField(List<DataField> fields, String name) {
+        for (DataField f : fields) {
+            if (f.name().equalsIgnoreCase(name)) {
+                return f;
+            }
+        }
+        throw new IllegalStateException("Partition column not found in schema: 
" + name);
     }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
new file mode 100644
index 0000000000..10c0bd2a41
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.hive.RowDataContainer;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+class PartitionedRecordWriter
+        implements FileSinkOperator.RecordWriter,
+                org.apache.hadoop.mapred.RecordWriter<NullWritable, 
RowDataContainer> {
+
+    private final PaimonRecordWriter inner;
+    private final GenericRow partitionRow;
+    private final int dataOnlyWidth;
+    private final int fullSchemaWidth;
+    private final JoinedRow joinedRow = new JoinedRow();
+
+    PartitionedRecordWriter(
+            PaimonRecordWriter inner, GenericRow partitionRow, int 
fullSchemaWidth) {
+        this.inner = inner;
+        this.partitionRow = partitionRow;
+        this.fullSchemaWidth = fullSchemaWidth;
+        this.dataOnlyWidth = fullSchemaWidth - partitionRow.getFieldCount();
+    }
+
+    @Override
+    public void write(Writable row) throws IOException {
+        InternalRow source = ((RowDataContainer) row).get();
+        InternalRow toWrite;
+        int width = source.getFieldCount();
+        if (width == dataOnlyWidth) {
+            joinedRow.replace(source, partitionRow);
+            toWrite = joinedRow;
+        } else if (width == fullSchemaWidth) {
+            toWrite = source;
+        } else {
+            throw new IOException(
+                    "Unexpected row width "
+                            + width
+                            + "; expected "
+                            + dataOnlyWidth
+                            + " (static partition path) or "
+                            + fullSchemaWidth
+                            + " (full schema)");
+        }
+        try {
+            inner.batchTableWrite().write(toWrite);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void write(NullWritable key, RowDataContainer value) throws 
IOException {
+        write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+        inner.close(abort);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+        inner.close(reporter);
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
index b124cef1b1..73a83e9d9f 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -196,6 +196,75 @@ public class HiveWriteITCase extends HiveTestBase {
         assertThat(select).containsExactly("1\t2\t3\tHello", "4\t5\t6\tFine");
     }
 
+    @Test
+    public void testInsertStaticPartition() throws Exception {
+        List<InternalRow> sourceData =
+                Arrays.asList(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("Alice"),
+                                5000.0,
+                                BinaryString.fromString("IT")),
+                        GenericRow.of(
+                                2,
+                                BinaryString.fromString("Bob"),
+                                6000.0,
+                                BinaryString.fromString("HR")),
+                        GenericRow.of(
+                                3,
+                                BinaryString.fromString("Charlie"),
+                                5500.0,
+                                BinaryString.fromString("IT")));
+
+        String sourceTableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new DataType[] {
+                                    DataTypes.INT(),
+                                    DataTypes.STRING(),
+                                    DataTypes.DOUBLE(),
+                                    DataTypes.STRING()
+                                },
+                                new String[] {"id", "name", "salary", 
"department"}),
+                        Collections.emptyList(),
+                        sourceData);
+
+        String tableName =
+                "static_partition_insert_" + 
UUID.randomUUID().toString().replace('-', '_');
+        hiveShell.execute("SET hive.metastore.warehouse.dir=" + 
folder.newFolder().toURI());
+        hiveShell.execute(
+                String.join(
+                        "\n",
+                        Arrays.asList(
+                                "CREATE TABLE " + tableName + " (",
+                                "  id INT,",
+                                "  name STRING,",
+                                "  salary DOUBLE,",
+                                "  department STRING",
+                                ")",
+                                "PARTITIONED BY (dt STRING)",
+                                "STORED BY '" + 
PaimonStorageHandler.class.getName() + "'",
+                                "TBLPROPERTIES (",
+                                "  'primary-key' = 'id,dt',",
+                                "  'bucket' = '2',",
+                                "  'file.format' = 'parquet'",
+                                ")")));
+
+        hiveShell.execute(
+                "INSERT INTO TABLE "
+                        + tableName
+                        + " PARTITION (dt='2026') "
+                        + "SELECT id, name, salary, department FROM "
+                        + sourceTableName);
+
+        List<String> rows = hiveShell.executeQuery("SELECT * FROM " + 
tableName + " ORDER BY id");
+        assertThat(rows)
+                .containsExactly(
+                        "1\tAlice\t5000.0\tIT\t2026",
+                        "2\tBob\t6000.0\tHR\t2026",
+                        "3\tCharlie\t5500.0\tIT\t2026");
+    }
+
     @Test
     public void testWriteOnlyWithChangeLogTableOption() throws Exception {
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
new file mode 100644
index 0000000000..ee31675ab5
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link PaimonOutputFormat}. */
+public class PaimonOutputFormatTest {
+
+    @Test
+    public void buildsRowMatchingUserBugReport() {
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(0, "id", 
DataTypes.INT().notNull()),
+                                new DataField(1, "name", DataTypes.STRING()),
+                                new DataField(2, "salary", DataTypes.DOUBLE()),
+                                new DataField(3, "department", 
DataTypes.STRING()),
+                                new DataField(4, "dt", 
DataTypes.STRING().notNull())),
+                        4,
+                        Collections.singletonList("dt"),
+                        Arrays.asList("id", "dt"),
+                        Collections.emptyMap(),
+                        "");
+
+        Path path = new Path("/wh/test_paimon2/dt=2026/file");
+
+        GenericRow row = PaimonOutputFormat.buildStaticPartitionRow(path, 
schema);
+        assertThat(row).isNotNull();
+        assertThat(row.getFieldCount()).isEqualTo(1);
+        
assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("2026"));
+    }
+
+    @Test
+    public void buildRowConvertsTypedPartitionValues() {
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(0, "v", DataTypes.INT()),
+                                new DataField(1, "region", 
DataTypes.STRING().notNull()),
+                                new DataField(2, "year", 
DataTypes.INT().notNull())),
+                        2,
+                        Arrays.asList("region", "year"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        "");
+
+        Path path = new Path("/wh/t/region=us/year=2026/file");
+
+        GenericRow row = PaimonOutputFormat.buildStaticPartitionRow(path, 
schema);
+        assertThat(row).isNotNull();
+        assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("us"));
+        assertThat(row.getInt(1)).isEqualTo(2026);
+    }
+
+    @Test
+    public void buildRowReturnsNullForUnpartitionedTable() {
+        GenericRow row =
+                PaimonOutputFormat.buildStaticPartitionRow(
+                        new Path("/wh/t/file"), singleFieldSchema());
+        assertThat(row).isNull();
+    }
+
+    @Test
+    public void buildRowReturnsNullWhenPathHasNoPartitionSegments() {
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(0, "v", DataTypes.INT()),
+                                new DataField(1, "dt", 
DataTypes.STRING().notNull())),
+                        1,
+                        Collections.singletonList("dt"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        "");
+
+        GenericRow row =
+                PaimonOutputFormat.buildStaticPartitionRow(new 
Path("/wh/t/_tmp/file"), schema);
+        assertThat(row).isNull();
+    }
+
+    @Test
+    public void buildRowFailsWhenPartitionKeysNotAtSchemaTail() {
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(0, "id", 
DataTypes.INT().notNull()),
+                                new DataField(1, "dt", 
DataTypes.STRING().notNull()),
+                                new DataField(2, "name", DataTypes.STRING())),
+                        2,
+                        Collections.singletonList("dt"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        "");
+
+        Path path = new Path("/wh/t/dt=2026/file");
+
+        assertThatThrownBy(() -> 
PaimonOutputFormat.buildStaticPartitionRow(path, schema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("trailing columns")
+                .hasMessageContaining("'dt'");
+    }
+
+    @Test
+    public void buildRowFailsOnMixedStaticAndDynamicPartition() {
+        TableSchema schema =
+                new TableSchema(
+                        0,
+                        Arrays.asList(
+                                new DataField(0, "v", DataTypes.INT()),
+                                new DataField(1, "region", 
DataTypes.STRING().notNull()),
+                                new DataField(2, "year", 
DataTypes.INT().notNull())),
+                        2,
+                        Arrays.asList("region", "year"),
+                        Collections.emptyList(),
+                        Collections.emptyMap(),
+                        "");
+
+        Path path = new Path("/wh/t/region=us/file");
+
+        assertThatThrownBy(() -> 
PaimonOutputFormat.buildStaticPartitionRow(path, schema))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Mixed static and dynamic partition");
+    }
+
+    private static TableSchema singleFieldSchema() {
+        return new TableSchema(
+                0,
+                Collections.singletonList(new DataField(0, "v", 
DataTypes.INT())),
+                0,
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyMap(),
+                "");
+    }
+}

Reply via email to