This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4322595df [FLINK-31558] Support updating schema type with restrictions
in CDC sink
4322595df is described below
commit 4322595dfc1809199e3a815c7fa82ccb0750165a
Author: tsreaper <[email protected]>
AuthorDate: Wed Mar 22 20:02:26 2023 +0800
[FLINK-31558] Support updating schema type with restrictions in CDC sink
This closes #683.
---
.../java/org/apache/paimon/utils/TypeUtils.java | 8 +-
.../sink/cdc/SchemaAwareStoreWriteOperator.java | 44 +++--
.../sink/cdc/SchemaChangeProcessFunction.java | 100 ++++++++++--
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 36 ++++-
.../cdc/SchemaAwareStoreWriteOperatorTest.java | 179 +++++++++++++++------
5 files changed, 289 insertions(+), 78 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 888c7ebf3..71fb9378b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -85,7 +85,13 @@ public class TypeUtils {
case BIGINT:
return Long.valueOf(s);
case FLOAT:
- return Float.valueOf(s);
+ double d = Double.parseDouble(s);
+ if (d == ((float) d)) {
+ return (float) d;
+ } else {
+ throw new NumberFormatException(
+ s + " cannot be cast to float due to precision
loss");
+ }
case DOUBLE:
return Double.valueOf(s);
case DATE:
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 4c109ca93..d529c7089 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -62,12 +63,12 @@ public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<Cd
@Override
protected SinkRecord processRecord(CdcRecord record) throws Exception {
- Map<String, String> fields = record.fields();
-
- if (!schemaMatched(fields)) {
+ Map<String, Object> convertedFields = tryConvert(record.fields());
+ if (convertedFields == null) {
while (true) {
table = table.copyWithLatestSchema();
- if (schemaMatched(fields)) {
+ convertedFields = tryConvert(record.fields());
+ if (convertedFields != null) {
break;
}
Thread.sleep(retrySleepMillis);
@@ -78,14 +79,11 @@ public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<Cd
TableSchema schema = table.schema();
GenericRow row = new GenericRow(schema.fields().size());
row.setRowKind(record.kind());
- for (Map.Entry<String, String> field : fields.entrySet()) {
- String key = field.getKey();
- String value = field.getValue();
+ for (Map.Entry<String, Object> convertedField :
convertedFields.entrySet()) {
+ String key = convertedField.getKey();
+ Object value = convertedField.getValue();
int idx = schema.fieldNames().indexOf(key);
- DataType type = schema.fields().get(idx).type();
- // TODO TypeUtils.castFromString cannot deal with complex types
like arrays and maps.
- // Change type of CdcRecord#field if needed.
- row.setField(idx, TypeUtils.castFromString(value, type));
+ row.setField(idx, value);
}
try {
@@ -95,8 +93,26 @@ public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<Cd
}
}
- private boolean schemaMatched(Map<String, String> fields) {
- TableSchema currentSchema = table.schema();
- return currentSchema.fieldNames().containsAll(fields.keySet());
+ private Map<String, Object> tryConvert(Map<String, String> fields) {
+ Map<String, Object> converted = new HashMap<>();
+ TableSchema schema = table.schema();
+ for (Map.Entry<String, String> field : fields.entrySet()) {
+ String key = field.getKey();
+ String value = field.getValue();
+ int idx = schema.fieldNames().indexOf(key);
+ if (idx < 0) {
+ return null;
+ }
+ DataType type = schema.fields().get(idx).type();
+ // TODO TypeUtils.castFromString cannot deal with complex types
like arrays and maps.
+ // Change type of CdcRecord#field if needed.
+ try {
+ converted.put(key, TypeUtils.castFromString(value, type));
+ } catch (Exception e) {
+ LOG.debug("Failed to convert value " + value + " to type " +
type, e);
+ return null;
+ }
+ }
+ return converted;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index d3ff71e09..85a9f080e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -20,6 +20,9 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -27,6 +30,9 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.List;
+
/**
* A {@link ProcessFunction} to handle {@link SchemaChange}.
*
@@ -47,18 +53,90 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
public void processElement(
SchemaChange schemaChange, Context context, Collector<Void>
collector)
throws Exception {
- Preconditions.checkArgument(
- schemaChange instanceof SchemaChange.AddColumn,
- "Currently, only SchemaChange.AddColumn is supported.");
- try {
- schemaManager.commitChanges(schemaChange);
- } catch (Exception e) {
- // This is normal. For example when a table is split into multiple
database tables, all
- // these tables will be added the same column. However
schemaManager can't handle
- // duplicated column adds, so we just catch the exception and log
it.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to perform schema change {}", schemaChange,
e);
+ if (schemaChange instanceof SchemaChange.AddColumn) {
+ try {
+ schemaManager.commitChanges(schemaChange);
+ } catch (IllegalArgumentException e) {
+ // This is normal. For example when a table is split into
multiple database tables,
+ // all these tables will be added the same column. However
schemaManager can't
+ // handle duplicated column adds, so we just catch the
exception and log it.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Failed to perform SchemaChange.AddColumn {}, "
+ + "possibly due to duplicated column name",
+ schemaChange,
+ e);
+ }
}
+ } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
+ SchemaChange.UpdateColumnType updateColumnType =
+ (SchemaChange.UpdateColumnType) schemaChange;
+ TableSchema schema =
+ schemaManager
+ .latest()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Table does not exist.
This is unexpected."));
+ int idx =
schema.fieldNames().indexOf(updateColumnType.fieldName());
+ Preconditions.checkState(
+ idx >= 0,
+ "Field name "
+ + updateColumnType.fieldName()
+ + " does not exist in table. This is unexpected.");
+ DataType oldType = schema.fields().get(idx).type();
+ DataType newType = updateColumnType.newDataType();
+ if (checkTypeConversion(oldType, newType)) {
+ schemaManager.commitChanges(schemaChange);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot convert field %s from type %s to %s",
+ updateColumnType.fieldName(), oldType,
newType));
+ }
+ } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
+ schemaManager.commitChanges(schemaChange);
+ } else if (schemaChange instanceof
SchemaChange.UpdateColumnNullability) {
+ schemaManager.commitChanges(schemaChange);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported schema change class "
+ + schemaChange.getClass().getName()
+ + ", content "
+ + schemaChange);
+ }
+ }
+
+ private static final List<DataTypeRoot> STRING_TYPES =
+ Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
+ private static final List<DataTypeRoot> INTEGER_TYPES =
+ Arrays.asList(
+ DataTypeRoot.TINYINT,
+ DataTypeRoot.SMALLINT,
+ DataTypeRoot.INTEGER,
+ DataTypeRoot.BIGINT);
+ private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
+ Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
+
+ private boolean checkTypeConversion(DataType oldType, DataType newType) {
+ int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
+ int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
+ if (oldIdx >= 0 && newIdx >= 0) {
+ return true;
+ }
+
+ oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
+ newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
+ if (oldIdx >= 0 && newIdx >= 0) {
+ return oldIdx <= newIdx;
}
+
+ oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
+ newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
+ if (oldIdx >= 0 && newIdx >= 0) {
+ return oldIdx <= newIdx;
+ }
+
+ return false;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index eb78c1d07..d912119bc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -89,20 +89,37 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
Map<Integer, Map<String, String>> expected = new HashMap<>();
List<String> fieldNames = new ArrayList<>();
+ List<Boolean> isBigInt = new ArrayList<>();
fieldNames.add("v0");
+ isBigInt.add(false);
int suffixId = 0;
for (int i = 0; i < numEvents; i++) {
if (schemaChangePositions.contains(i)) {
- suffixId++;
- String newName = "v" + suffixId;
- fieldNames.add(newName);
- events[i] = new TestCdcEvent(SchemaChange.addColumn(newName,
DataTypes.INT()));
+ if (random.nextBoolean()) {
+ int idx = random.nextInt(fieldNames.size());
+ isBigInt.set(idx, true);
+ events[i] =
+ new TestCdcEvent(
+ SchemaChange.updateColumnType(
+ fieldNames.get(idx),
DataTypes.BIGINT()));
+ } else {
+ suffixId++;
+ String newName = "v" + suffixId;
+ fieldNames.add(newName);
+ isBigInt.add(false);
+ events[i] = new
TestCdcEvent(SchemaChange.addColumn(newName, DataTypes.INT()));
+ }
} else {
Map<String, String> fields = new HashMap<>();
int key = random.nextInt(numKeys);
fields.put("k", String.valueOf(key));
- for (String fieldName : fieldNames) {
- fields.put(fieldName, String.valueOf(random.nextInt()));
+ for (int j = 0; j < fieldNames.size(); j++) {
+ String fieldName = fieldNames.get(j);
+ if (isBigInt.get(j)) {
+ fields.put(fieldName,
String.valueOf(random.nextLong()));
+ } else {
+ fields.put(fieldName,
String.valueOf(random.nextInt()));
+ }
}
List<CdcRecord> records = new ArrayList<>();
@@ -174,7 +191,12 @@ public class FlinkCdcSinkITCase extends AbstractTestBase {
Map<String, String> fields = new HashMap<>();
for (int i = 0; i < schema.fieldNames().size(); i++) {
if (!row.isNullAt(i)) {
- fields.put(schema.fieldNames().get(i),
String.valueOf(row.getInt(i)));
+ fields.put(
+ schema.fieldNames().get(i),
+ String.valueOf(
+
schema.fields().get(i).type().equals(DataTypes.BIGINT())
+ ? row.getLong(i)
+ : row.getInt(i)));
}
}
actual.put(Integer.valueOf(fields.get("k")), fields);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
index c80196ef5..0fb5068e3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -52,6 +52,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@@ -65,11 +66,6 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SchemaAwareStoreWriteOperator}. */
public class SchemaAwareStoreWriteOperatorTest {
- private static final RowType ROW_TYPE =
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.STRING()},
- new String[] {"pt", "k", "v"});
-
@TempDir java.nio.file.Path tempDir;
private Path tablePath;
@@ -91,35 +87,21 @@ public class SchemaAwareStoreWriteOperatorTest {
@Test
@Timeout(30)
- public void testProcessRecord() throws Exception {
- FileStoreTable table = createFileStoreTable();
+ public void testAddColumn() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"pt", "k", "v"});
+
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.singletonList("pt"),
Arrays.asList("pt", "k"));
OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
createTestHarness(table);
harness.open();
- BlockingQueue<CdcRecord> toProcess = new LinkedBlockingQueue<>();
- BlockingQueue<CdcRecord> processed = new LinkedBlockingQueue<>();
- AtomicBoolean running = new AtomicBoolean(true);
- Runnable r =
- () -> {
- long timestamp = 0;
- try {
- while (running.get()) {
- if (toProcess.isEmpty()) {
- Thread.sleep(10);
- continue;
- }
-
- CdcRecord record = toProcess.poll();
- harness.processElement(record, ++timestamp);
- processed.offer(record);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
-
- Thread t = new Thread(r);
+ Runner runner = new Runner(harness);
+ Thread t = new Thread(runner);
t.start();
// check that records with compatible schema can be processed
immediately
@@ -129,16 +111,16 @@ public class SchemaAwareStoreWriteOperatorTest {
fields.put("k", "1");
fields.put("v", "10");
CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
- toProcess.offer(expected);
- CdcRecord actual = processed.take();
+ runner.offer(expected);
+ CdcRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
fields = new HashMap<>();
fields.put("pt", "0");
fields.put("k", "2");
expected = new CdcRecord(RowKind.INSERT, fields);
- toProcess.offer(expected);
- actual = processed.take();
+ runner.offer(expected);
+ actual = runner.take();
assertThat(actual).isEqualTo(expected);
// check that records with new fields should be processed after schema
is updated
@@ -149,16 +131,80 @@ public class SchemaAwareStoreWriteOperatorTest {
fields.put("v", "30");
fields.put("v2", "300");
expected = new CdcRecord(RowKind.INSERT, fields);
- toProcess.offer(expected);
- actual = processed.poll(1, TimeUnit.SECONDS);
+ runner.offer(expected);
+ actual = runner.poll(1);
assertThat(actual).isNull();
SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.INT()));
- actual = processed.take();
+ actual = runner.take();
+ assertThat(actual).isEqualTo(expected);
+
+ runner.stop();
+ t.join();
+ harness.close();
+ }
+
+ @Test
+ @Timeout(30)
+ public void testUpdateColumnType() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.FLOAT()},
+ new String[] {"k", "v1", "v2"});
+
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("k"));
+ OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+ createTestHarness(table);
+ harness.open();
+
+ Runner runner = new Runner(harness);
+ Thread t = new Thread(runner);
+ t.start();
+
+ // check that records with compatible schema can be processed
immediately
+
+ Map<String, String> fields = new HashMap<>();
+ fields.put("k", "1");
+ fields.put("v1", "10");
+ fields.put("v2", "0.625");
+ CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+ runner.offer(expected);
+ CdcRecord actual = runner.take();
+ assertThat(actual).isEqualTo(expected);
+
+ // check that records with new fields should be processed after schema
is updated
+
+ fields = new HashMap<>();
+ fields.put("k", "2");
+ fields.put("v1", "12345678987654321");
+ fields.put("v2", "0.25");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ runner.offer(expected);
+ actual = runner.poll(1);
+ assertThat(actual).isNull();
+
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ schemaManager.commitChanges(SchemaChange.updateColumnType("v1",
DataTypes.BIGINT()));
+ actual = runner.take();
+ assertThat(actual).isEqualTo(expected);
+
+ fields = new HashMap<>();
+ fields.put("k", "3");
+ fields.put("v1", "100");
+ fields.put("v2", "1.0000000000009095");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ runner.offer(expected);
+ actual = runner.poll(1);
+ assertThat(actual).isNull();
+
+ schemaManager.commitChanges(SchemaChange.updateColumnType("v2",
DataTypes.DOUBLE()));
+ actual = runner.take();
assertThat(actual).isEqualTo(expected);
- running.set(false);
+ runner.stop();
t.join();
harness.close();
}
@@ -180,19 +226,62 @@ public class SchemaAwareStoreWriteOperatorTest {
return harness;
}
- private FileStoreTable createFileStoreTable() throws Exception {
+ private FileStoreTable createFileStoreTable(
+ RowType rowType, List<String> partitions, List<String>
primaryKeys) throws Exception {
Options conf = new Options();
conf.set(SchemaAwareStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), tablePath),
- new Schema(
- ROW_TYPE.getFields(),
- Collections.singletonList("pt"),
- Arrays.asList("pt", "k"),
- conf.toMap(),
- ""));
+ new Schema(rowType.getFields(), partitions,
primaryKeys, conf.toMap(), ""));
return FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
}
+
+ private static class Runner implements Runnable {
+
+ private final OneInputStreamOperatorTestHarness<CdcRecord,
Committable> harness;
+ private final BlockingQueue<CdcRecord> toProcess = new
LinkedBlockingQueue<>();
+ private final BlockingQueue<CdcRecord> processed = new
LinkedBlockingQueue<>();
+ private final AtomicBoolean running = new AtomicBoolean(true);
+
+ private Runner(OneInputStreamOperatorTestHarness<CdcRecord,
Committable> harness) {
+ this.harness = harness;
+ }
+
+ private void offer(CdcRecord record) {
+ toProcess.offer(record);
+ }
+
+ private CdcRecord take() throws Exception {
+ return processed.take();
+ }
+
+ private CdcRecord poll(long seconds) throws Exception {
+ return processed.poll(seconds, TimeUnit.SECONDS);
+ }
+
+ private void stop() {
+ running.set(false);
+ }
+
+ @Override
+ public void run() {
+ long timestamp = 0;
+ try {
+ while (running.get()) {
+ if (toProcess.isEmpty()) {
+ Thread.sleep(10);
+ continue;
+ }
+
+ CdcRecord record = toProcess.poll();
+ harness.processElement(record, ++timestamp);
+ processed.offer(record);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}