This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a3fc947d5 [core] Add immutable id for system field (#4227)
a3fc947d5 is described below
commit a3fc947d5a4916b6626a783c39104154a70bf257
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Sep 23 17:12:00 2024 +0800
[core] Add immutable id for system field (#4227)
---
.../java/org/apache/paimon/table/SystemFields.java | 52 ++++++++++++++++
.../main/java/org/apache/paimon/types/RowType.java | 6 +-
.../src/main/java/org/apache/paimon/KeyValue.java | 69 ++++------------------
.../apache/paimon/schema/SchemaEvolutionUtil.java | 9 +--
.../org/apache/paimon/schema/SchemaValidation.java | 4 +-
.../org/apache/paimon/schema/SystemColumns.java | 36 -----------
.../apache/paimon/table/PrimaryKeyTableUtils.java | 5 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 14 +----
.../paimon/operation/MergeFileSplitReadTest.java | 2 +-
.../apache/paimon/table/SchemaEvolutionTest.java | 21 +------
.../paimon/flink/sink/LocalMergeOperator.java | 4 +-
11 files changed, 85 insertions(+), 137 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java
b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java
new file mode 100644
index 000000000..856e61de6
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/table/SystemFields.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** System fields. */
+public class SystemFields {
+
+ public static final int SYSTEM_FIELD_ID_START = Integer.MAX_VALUE / 2;
+
+ public static final String KEY_FIELD_PREFIX = "_KEY_";
+ public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START;
+
+ public static final DataField SEQUENCE_NUMBER =
+ new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER",
DataTypes.BIGINT().notNull());
+
+ public static final DataField VALUE_KIND =
+ new DataField(Integer.MAX_VALUE - 2, "_VALUE_KIND",
DataTypes.TINYINT().notNull());
+
+ public static final DataField LEVEL =
+ new DataField(Integer.MAX_VALUE - 3, "_LEVEL",
DataTypes.INT().notNull());
+
+ public static final Set<String> SYSTEM_FIELD_NAMES =
+ Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name())
+ .collect(Collectors.toSet());
+
+ public static boolean isSystemField(int fieldId) {
+ return fieldId >= SYSTEM_FIELD_ID_START;
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 0fa6a9c1b..dc7c7ae32 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -20,6 +20,7 @@ package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.SystemFields;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -256,7 +257,10 @@ public final class RowType extends DataType {
public static int currentHighestFieldId(List<DataField> fields) {
Set<Integer> fieldIds = new HashSet<>();
new RowType(fields).collectFieldIds(fieldIds);
- return fieldIds.stream().max(Integer::compareTo).orElse(-1);
+ return fieldIds.stream()
+ .filter(i -> !SystemFields.isSystemField(i))
+ .max(Integer::compareTo)
+ .orElse(-1);
}
public static Builder builder() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
index 31d3749bf..f2a6c0bde 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java
@@ -21,12 +21,9 @@ package org.apache.paimon;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.InternalRowUtils;
import java.util.ArrayList;
@@ -34,10 +31,9 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.paimon.schema.SystemColumns.LEVEL;
-import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
-import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
-import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.table.SystemFields.LEVEL;
+import static org.apache.paimon.table.SystemFields.SEQUENCE_NUMBER;
+import static org.apache.paimon.table.SystemFields.VALUE_KIND;
/**
* A key value, including user key, sequence number, value kind and value.
This object can be
@@ -115,70 +111,29 @@ public class KeyValue {
}
public static RowType schema(RowType keyType, RowType valueType) {
- List<DataField> fields = new ArrayList<>(keyType.getFields());
- fields.add(new DataField(0, SEQUENCE_NUMBER, new BigIntType(false)));
- fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
- fields.addAll(valueType.getFields());
- return new RowType(fields);
+ return new RowType(createKeyValueFields(keyType.getFields(),
valueType.getFields()));
}
public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
- RowType.Builder builder = RowType.builder();
- schema(keyType, valueType)
- .getFields()
- .forEach(f -> builder.field(f.name(), f.type(),
f.description()));
- builder.field(LEVEL, DataTypes.INT().notNull());
- return builder.build();
+ List<DataField> fields = new ArrayList<>(schema(keyType,
valueType).getFields());
+ fields.add(LEVEL);
+ return new RowType(fields);
}
/**
- * Create key-value fields, we need to add a const value to the id of
value field to ensure that
- * they are consistent when compared by field id. For example, there are
two table with key
- * value fields as follows
- *
- * <ul>
- * <li>Table1 key fields: 1->a, 2->b, 3->c; value fields: 0->value_count
- * <li>Table2 key fields: 1->c, 3->d, 4->a, 5->b; value fields:
0->value_count
- * </ul>
- *
- * <p>We will use 5 as maxKeyId, and create fields for Table1/Table2 as
follows
- *
- * <ul>
- * <li>Table1 fields: 1->a, 2->b, 3->c, 6->seq, 7->kind, 8->value_count
- * <li>Table2 fields: 1->c, 3->d, 4->a, 5->b, 6->seq, 7->kind,
8->value_count
- * </ul>
- *
- * <p>Then we can compare these two table fields with the field id.
+ * Create key-value fields.
*
* @param keyFields the key fields
* @param valueFields the value fields
- * @param maxKeyId the max key id
* @return the table fields
*/
public static List<DataField> createKeyValueFields(
- List<DataField> keyFields, List<DataField> valueFields, final int
maxKeyId) {
- checkState(maxKeyId >=
keyFields.stream().mapToInt(DataField::id).max().orElse(0));
-
+ List<DataField> keyFields, List<DataField> valueFields) {
List<DataField> fields = new ArrayList<>(keyFields.size() +
valueFields.size() + 2);
fields.addAll(keyFields);
- fields.add(
- new DataField(
- maxKeyId + 1,
- SEQUENCE_NUMBER,
- new org.apache.paimon.types.BigIntType(false)));
- fields.add(
- new DataField(
- maxKeyId + 2, VALUE_KIND, new
org.apache.paimon.types.TinyIntType(false)));
- for (DataField valueField : valueFields) {
- DataField newValueField =
- new DataField(
- valueField.id() + maxKeyId + 3,
- valueField.name(),
- valueField.type(),
- valueField.description());
- fields.add(newValueField);
- }
-
+ fields.add(SEQUENCE_NUMBER);
+ fields.add(VALUE_KIND);
+ fields.addAll(valueFields);
return fields;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index 6b724c74f..25b8cc63f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -217,14 +217,9 @@ public class SchemaEvolutionUtil {
int[] dataProjection,
List<DataField> dataKeyFields,
List<DataField> dataValueFields) {
- int maxKeyId =
- Math.max(
-
tableKeyFields.stream().mapToInt(DataField::id).max().orElse(0),
-
dataKeyFields.stream().mapToInt(DataField::id).max().orElse(0));
List<DataField> tableFields =
- KeyValue.createKeyValueFields(tableKeyFields,
tableValueFields, maxKeyId);
- List<DataField> dataFields =
- KeyValue.createKeyValueFields(dataKeyFields, dataValueFields,
maxKeyId);
+ KeyValue.createKeyValueFields(tableKeyFields,
tableValueFields);
+ List<DataField> dataFields =
KeyValue.createKeyValueFields(dataKeyFields, dataValueFields);
return createIndexCastMapping(tableProjection, tableFields,
dataProjection, dataFields);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 72ea96024..c714b90d5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -68,8 +68,8 @@ import static
org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
-import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
import static org.apache.paimon.types.DataTypeRoot.MAP;
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
deleted file mode 100644
index f6350f44a..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.schema;
-
-import java.util.Arrays;
-import java.util.List;
-
-/** System columns for key value store. */
-public class SystemColumns {
-
- /** System field names. */
- public static final String KEY_FIELD_PREFIX = "_KEY_";
-
- public static final String VALUE_COUNT = "_VALUE_COUNT";
- public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER";
- public static final String VALUE_KIND = "_VALUE_KIND";
- public static final String LEVEL = "_LEVEL";
- public static final List<String> SYSTEM_FIELD_NAMES =
- Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND);
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index c74ab1f1f..58cde07c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -34,7 +34,8 @@ import org.apache.paimon.types.RowType;
import java.util.List;
import java.util.stream.Collectors;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_ID_START;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
/** Utils for creating changelog table with primary keys. */
public class PrimaryKeyTableUtils {
@@ -45,7 +46,7 @@ public class PrimaryKeyTableUtils {
public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
return keyFields.stream()
- .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()))
+ .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()).newId(f.id()
+ KEY_FIELD_ID_START))
.collect(Collectors.toList());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 80080516e..f2a9c44dd 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -145,7 +145,7 @@ public abstract class MergeTreeTestBase {
options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) +
1);
this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new
IntType())));
- RowType valueType = new RowType(singletonList(new DataField(0, "v",
new IntType())));
+ RowType valueType = new RowType(singletonList(new DataField(1, "v",
new IntType())));
String identifier = "avro";
FileFormat flushingAvro = new FlushingFileFormat(identifier);
@@ -161,20 +161,12 @@ public abstract class MergeTreeTestBase {
new KeyValueFieldsExtractor() {
@Override
public List<DataField> keyFields(TableSchema
schema) {
- return Collections.singletonList(
- new DataField(
- 0,
- "k",
- new
org.apache.paimon.types.IntType(false)));
+ return keyType.getFields();
}
@Override
public List<DataField> valueFields(TableSchema
schema) {
- return Collections.singletonList(
- new DataField(
- 0,
- "v",
- new
org.apache.paimon.types.IntType(false)));
+ return valueType.getFields();
}
},
new CoreOptions(new HashMap<>()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 1794e8aea..6e9a77398 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -128,7 +128,7 @@ public class MergeFileSplitReadTest {
public List<DataField> valueFields(TableSchema
schema) {
return Collections.singletonList(
new DataField(
- 0,
+ 3,
"count",
new
org.apache.paimon.types.BigIntType()));
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 8771377f2..2b20b25c0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -57,8 +57,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
-import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
-import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
+import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
+import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -424,23 +424,6 @@ public class SchemaEvolutionTest {
@Test
public void testCreateAlterSystemField() throws Exception {
- Schema schema1 =
- new Schema(
- RowType.of(
- new DataType[] {DataTypes.INT(),
DataTypes.BIGINT()},
- new String[] {"f0", "_VALUE_COUNT"})
- .getFields(),
- Collections.emptyList(),
- Collections.emptyList(),
- new HashMap<>(),
- "");
- assertThatThrownBy(() -> schemaManager.createTable(schema1))
- .isInstanceOf(IllegalStateException.class)
- .hasMessage(
- String.format(
- "Field name[%s] in schema cannot be exist in
%s",
- "_VALUE_COUNT", SYSTEM_FIELD_NAMES));
-
Schema schema2 =
new Schema(
RowType.of(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index 18d440dbc..aba891e44 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -48,6 +48,8 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.List;
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
+
/**
* {@link AbstractStreamOperator} which buffer input record and apply merge
function when the buffer
* is full. Mainly to resolve data skew on primary keys.
@@ -105,7 +107,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
// partition fields.
@Override
public List<DataField>
keyFields(TableSchema schema) {
- return schema.primaryKeysFields();
+ return
addKeyNamePrefix(schema.primaryKeysFields());
}
@Override