This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2b4f24ff39 [hive] Fix insert into static partitions on managed Paimon
tables (#7824)
2b4f24ff39 is described below
commit 2b4f24ff39432da46e28884b423a74d29e8cf7b5
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jun 4 11:26:16 2026 +0530
[hive] Fix insert into static partitions on managed Paimon tables (#7824)
---
.../paimon/hive/mapred/PaimonOutputFormat.java | 124 ++++++++++++++-
.../hive/mapred/PartitionedRecordWriter.java | 92 +++++++++++
.../org/apache/paimon/hive/HiveWriteITCase.java | 69 +++++++++
.../paimon/hive/mapred/PaimonOutputFormatTest.java | 169 +++++++++++++++++++++
4 files changed, 447 insertions(+), 7 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
index ef1ee687ec..9254ba91f6 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputFormat.java
@@ -19,10 +19,15 @@
package org.apache.paimon.hive.mapred;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.hive.RowDataContainer;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.TypeUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +42,10 @@ import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -69,20 +77,122 @@ public class PaimonOutputFormat
Properties properties,
Progressable progressable)
throws IOException {
- return writer(jobConf);
+ FileStoreTable table = forWriteOnly(createFileStoreTable(jobConf));
+ PaimonRecordWriter inner = writer(jobConf, table);
+
+ GenericRow staticPartitionRow = buildStaticPartitionRow(path,
table.schema());
+ return staticPartitionRow == null
+ ? inner
+ : new PartitionedRecordWriter(
+ inner, staticPartitionRow,
table.schema().fields().size());
}
private static PaimonRecordWriter writer(JobConf jobConf) {
+ return writer(jobConf, forWriteOnly(createFileStoreTable(jobConf)));
+ }
+
+ private static PaimonRecordWriter writer(JobConf jobConf, FileStoreTable
table) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jobConf);
+ BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+ return new PaimonRecordWriter(batchTableWrite, taskAttemptID,
table.name());
+ }
- FileStoreTable table = createFileStoreTable(jobConf);
- // force write-only = true
+ private static FileStoreTable forWriteOnly(FileStoreTable table) {
Map<String, String> newOptions =
Collections.singletonMap(CoreOptions.WRITE_ONLY.key(),
Boolean.TRUE.toString());
- FileStoreTable copy = table.copy(newOptions);
- BatchWriteBuilder batchWriteBuilder = copy.newBatchWriteBuilder();
- BatchTableWrite batchTableWrite = batchWriteBuilder.newWrite();
+ return table.copy(newOptions);
+ }
+
+ static GenericRow buildStaticPartitionRow(Path path, TableSchema schema) {
+ List<String> partitionKeys = schema.partitionKeys();
+ if (partitionKeys.isEmpty()) {
+ return null;
+ }
+
+ LinkedHashMap<String, String> spec =
+ PartitionPathUtils.extractPartitionSpecFromPath(
+ new org.apache.paimon.fs.Path(path.toString()));
+ if (spec.isEmpty()) {
+ return null;
+ }
+
+ assertPartitionKeysAreSchemaTail(schema.fields(), partitionKeys);
+
+ GenericRow row = new GenericRow(partitionKeys.size());
+ List<DataField> fields = schema.fields();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ String key = partitionKeys.get(i);
+ String raw = lookupCaseInsensitive(spec, key);
+ if (raw == null) {
+ throw new IllegalArgumentException(
+ "Mixed static and dynamic partition writes are not
supported for managed "
+ + "Paimon Hive tables. Partition key '"
+ + key
+ + "' has no value in the static partition path
"
+ + path
+ + ". Provide values for every partition key in
the INSERT.");
+ }
+ DataField field = findField(fields, key);
+ row.setField(i, TypeUtils.castFromString(raw, field.type()));
+ }
+ return row;
+ }
+
+ private static void assertPartitionKeysAreSchemaTail(
+ List<DataField> fields, List<String> partitionKeys) {
+ int n = partitionKeys.size();
+ int start = fields.size() - n;
+ if (start < 0) {
+ throw new IllegalArgumentException(
+ "Table schema has "
+ + fields.size()
+ + " columns but the schema declares "
+ + n
+ + " partition keys. Static partition write
requires partition keys "
+ + "to be a trailing slice of the schema.");
+ }
+ for (int i = 0; i < n; i++) {
+ String expected = partitionKeys.get(i);
+ String actual = fields.get(start + i).name();
+ if (!actual.equalsIgnoreCase(expected)) {
+ List<String> schemaNames = new ArrayList<>(fields.size());
+ for (DataField f : fields) {
+ schemaNames.add(f.name());
+ }
+ throw new IllegalArgumentException(
+ "Static partition write requires partition keys to be
the trailing "
+ + "columns of the schema in declared order.
Expected column at "
+ + "position "
+ + (start + i)
+ + " to be '"
+ + expected
+ + "', found '"
+ + actual
+ + "'. Schema: "
+ + schemaNames
+ + ", partition keys: "
+ + partitionKeys
+ + ".");
+ }
+ }
+ }
+
+ private static String lookupCaseInsensitive(Map<String, String> spec,
String key) {
+ for (Map.Entry<String, String> e : spec.entrySet()) {
+ if (e.getKey().equalsIgnoreCase(key)) {
+ return e.getValue();
+ }
+ }
+ return null;
+ }
- return new PaimonRecordWriter(batchTableWrite, taskAttemptID,
copy.name());
+ private static DataField findField(List<DataField> fields, String name) {
+ for (DataField f : fields) {
+ if (f.name().equalsIgnoreCase(name)) {
+ return f;
+ }
+ }
+ throw new IllegalStateException("Partition column not found in schema:
" + name);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
new file mode 100644
index 0000000000..10c0bd2a41
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PartitionedRecordWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.hive.RowDataContainer;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+class PartitionedRecordWriter
+ implements FileSinkOperator.RecordWriter,
+ org.apache.hadoop.mapred.RecordWriter<NullWritable,
RowDataContainer> {
+
+ private final PaimonRecordWriter inner;
+ private final GenericRow partitionRow;
+ private final int dataOnlyWidth;
+ private final int fullSchemaWidth;
+ private final JoinedRow joinedRow = new JoinedRow();
+
+ PartitionedRecordWriter(
+ PaimonRecordWriter inner, GenericRow partitionRow, int
fullSchemaWidth) {
+ this.inner = inner;
+ this.partitionRow = partitionRow;
+ this.fullSchemaWidth = fullSchemaWidth;
+ this.dataOnlyWidth = fullSchemaWidth - partitionRow.getFieldCount();
+ }
+
+ @Override
+ public void write(Writable row) throws IOException {
+ InternalRow source = ((RowDataContainer) row).get();
+ InternalRow toWrite;
+ int width = source.getFieldCount();
+ if (width == dataOnlyWidth) {
+ joinedRow.replace(source, partitionRow);
+ toWrite = joinedRow;
+ } else if (width == fullSchemaWidth) {
+ toWrite = source;
+ } else {
+ throw new IOException(
+ "Unexpected row width "
+ + width
+ + "; expected "
+ + dataOnlyWidth
+ + " (static partition path) or "
+ + fullSchemaWidth
+ + " (full schema)");
+ }
+ try {
+ inner.batchTableWrite().write(toWrite);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void write(NullWritable key, RowDataContainer value) throws
IOException {
+ write(value);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ inner.close(abort);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ inner.close(reporter);
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
index b124cef1b1..73a83e9d9f 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -196,6 +196,75 @@ public class HiveWriteITCase extends HiveTestBase {
assertThat(select).containsExactly("1\t2\t3\tHello", "4\t5\t6\tFine");
}
+ @Test
+ public void testInsertStaticPartition() throws Exception {
+ List<InternalRow> sourceData =
+ Arrays.asList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("Alice"),
+ 5000.0,
+ BinaryString.fromString("IT")),
+ GenericRow.of(
+ 2,
+ BinaryString.fromString("Bob"),
+ 6000.0,
+ BinaryString.fromString("HR")),
+ GenericRow.of(
+ 3,
+ BinaryString.fromString("Charlie"),
+ 5500.0,
+ BinaryString.fromString("IT")));
+
+ String sourceTableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.DOUBLE(),
+ DataTypes.STRING()
+ },
+ new String[] {"id", "name", "salary",
"department"}),
+ Collections.emptyList(),
+ sourceData);
+
+ String tableName =
+ "static_partition_insert_" +
UUID.randomUUID().toString().replace('-', '_');
+ hiveShell.execute("SET hive.metastore.warehouse.dir=" +
folder.newFolder().toURI());
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE TABLE " + tableName + " (",
+ " id INT,",
+ " name STRING,",
+ " salary DOUBLE,",
+ " department STRING",
+ ")",
+ "PARTITIONED BY (dt STRING)",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "TBLPROPERTIES (",
+ " 'primary-key' = 'id,dt',",
+ " 'bucket' = '2',",
+ " 'file.format' = 'parquet'",
+ ")")));
+
+ hiveShell.execute(
+ "INSERT INTO TABLE "
+ + tableName
+ + " PARTITION (dt='2026') "
+ + "SELECT id, name, salary, department FROM "
+ + sourceTableName);
+
+ List<String> rows = hiveShell.executeQuery("SELECT * FROM " +
tableName + " ORDER BY id");
+ assertThat(rows)
+ .containsExactly(
+ "1\tAlice\t5000.0\tIT\t2026",
+ "2\tBob\t6000.0\tHR\t2026",
+ "3\tCharlie\t5500.0\tIT\t2026");
+ }
+
@Test
public void testWriteOnlyWithChangeLogTableOption() throws Exception {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
new file mode 100644
index 0000000000..ee31675ab5
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputFormatTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link PaimonOutputFormat}. */
+public class PaimonOutputFormatTest {
+
+ @Test
+ public void buildsRowMatchingUserBugReport() {
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(0, "id",
DataTypes.INT().notNull()),
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "salary", DataTypes.DOUBLE()),
+ new DataField(3, "department",
DataTypes.STRING()),
+ new DataField(4, "dt",
DataTypes.STRING().notNull())),
+ 4,
+ Collections.singletonList("dt"),
+ Arrays.asList("id", "dt"),
+ Collections.emptyMap(),
+ "");
+
+ Path path = new Path("/wh/test_paimon2/dt=2026/file");
+
+ GenericRow row = PaimonOutputFormat.buildStaticPartitionRow(path,
schema);
+ assertThat(row).isNotNull();
+ assertThat(row.getFieldCount()).isEqualTo(1);
+
assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("2026"));
+ }
+
+ @Test
+ public void buildRowConvertsTypedPartitionValues() {
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(0, "v", DataTypes.INT()),
+ new DataField(1, "region",
DataTypes.STRING().notNull()),
+ new DataField(2, "year",
DataTypes.INT().notNull())),
+ 2,
+ Arrays.asList("region", "year"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+
+ Path path = new Path("/wh/t/region=us/year=2026/file");
+
+ GenericRow row = PaimonOutputFormat.buildStaticPartitionRow(path,
schema);
+ assertThat(row).isNotNull();
+ assertThat(row.getString(0)).isEqualTo(BinaryString.fromString("us"));
+ assertThat(row.getInt(1)).isEqualTo(2026);
+ }
+
+ @Test
+ public void buildRowReturnsNullForUnpartitionedTable() {
+ GenericRow row =
+ PaimonOutputFormat.buildStaticPartitionRow(
+ new Path("/wh/t/file"), singleFieldSchema());
+ assertThat(row).isNull();
+ }
+
+ @Test
+ public void buildRowReturnsNullWhenPathHasNoPartitionSegments() {
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(0, "v", DataTypes.INT()),
+ new DataField(1, "dt",
DataTypes.STRING().notNull())),
+ 1,
+ Collections.singletonList("dt"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+
+ GenericRow row =
+ PaimonOutputFormat.buildStaticPartitionRow(new
Path("/wh/t/_tmp/file"), schema);
+ assertThat(row).isNull();
+ }
+
+ @Test
+ public void buildRowFailsWhenPartitionKeysNotAtSchemaTail() {
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(0, "id",
DataTypes.INT().notNull()),
+ new DataField(1, "dt",
DataTypes.STRING().notNull()),
+ new DataField(2, "name", DataTypes.STRING())),
+ 2,
+ Collections.singletonList("dt"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+
+ Path path = new Path("/wh/t/dt=2026/file");
+
+ assertThatThrownBy(() ->
PaimonOutputFormat.buildStaticPartitionRow(path, schema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("trailing columns")
+ .hasMessageContaining("'dt'");
+ }
+
+ @Test
+ public void buildRowFailsOnMixedStaticAndDynamicPartition() {
+ TableSchema schema =
+ new TableSchema(
+ 0,
+ Arrays.asList(
+ new DataField(0, "v", DataTypes.INT()),
+ new DataField(1, "region",
DataTypes.STRING().notNull()),
+ new DataField(2, "year",
DataTypes.INT().notNull())),
+ 2,
+ Arrays.asList("region", "year"),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+
+ Path path = new Path("/wh/t/region=us/file");
+
+ assertThatThrownBy(() ->
PaimonOutputFormat.buildStaticPartitionRow(path, schema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Mixed static and dynamic partition");
+ }
+
+ private static TableSchema singleFieldSchema() {
+ return new TableSchema(
+ 0,
+ Collections.singletonList(new DataField(0, "v",
DataTypes.INT())),
+ 0,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ "");
+ }
+}