This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new daf482e286 [flink] Support creating table with blob in flink sql.
(#6351)
daf482e286 is described below
commit daf482e2866cd0ec7c24700b27261d19a60106f9
Author: YeJunHao <[email protected]>
AuthorDate: Tue Sep 30 11:45:45 2025 +0800
[flink] Support creating table with blob in flink sql. (#6351)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../main/java/org/apache/paimon/CoreOptions.java | 6 +++
.../apache/paimon/io/RowDataRollingFileWriter.java | 37 +---------------
.../apache/paimon/io/RollingFileWriterTest.java | 2 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 15 ++++++-
.../java/org/apache/paimon/flink/FlinkRowData.java | 2 +-
.../apache/paimon/flink/FlinkRowDataWithBlob.java | 37 ++++++++++++++++
.../apache/paimon/flink/LogicalTypeConversion.java | 12 ++++++
.../paimon/flink/source/FileStoreSourceReader.java | 7 +++-
.../flink/source/FileStoreSourceSplitReader.java | 27 ++++++++++--
.../apache/paimon/flink/source/FlinkSource.java | 4 +-
.../align/AlignedContinuousFileStoreSource.java | 8 +++-
.../flink/source/align/AlignedSourceReader.java | 6 ++-
.../org/apache/paimon/flink/BlobTableITCase.java | 47 +++++++++++++++++++++
.../flink/source/FileStoreSourceReaderTest.java | 1 +
.../source/FileStoreSourceSplitReaderTest.java | 3 +-
.../source/align/AlignedSourceReaderTest.java | 1 +
.../spark/commands/DataEvolutionPaimonWriter.scala | 7 ++++
.../MergeIntoPaimonDataEvolutionTable.scala | 49 +++++++++++++++++++---
19 files changed, 222 insertions(+), 55 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 2eed98adaa..f2e90e5dce 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -50,6 +50,12 @@ under the License.
<td>Boolean</td>
<td>Whether to create underlying storage when reading and writing
the table.</td>
</tr>
+ <tr>
+ <td><h5>blob.field</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify the blob field.</td>
+ </tr>
<tr>
<td><h5>bucket</h5></td>
<td style="word-wrap: break-word;">-1</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 500b16843d..e2046da926 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1989,6 +1989,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Format table file path only contain
partition value.");
+ public static final ConfigOption<String> BLOB_FIELD =
+ key("blob.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specify the blob field.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index c309a10515..328bc193b9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -18,22 +18,17 @@
package org.apache.paimon.io;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.format.SimpleStatsCollector;
-import org.apache.paimon.format.avro.AvroFileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.LongCounter;
import javax.annotation.Nullable;
-import java.util.Arrays;
import java.util.List;
/** {@link RollingFileWriterImpl} for data files containing {@link
InternalRow}. */
@@ -58,7 +53,7 @@ public class RowDataRollingFileWriter extends
RollingFileWriterImpl<InternalRow,
() ->
new RowDataFileWriter(
fileIO,
- createFileWriterContext(
+ RollingFileWriter.createFileWriterContext(
fileFormat, writeSchema,
statsCollectors, fileCompression),
pathFactory.newPath(),
writeSchema,
@@ -72,34 +67,4 @@ public class RowDataRollingFileWriter extends
RollingFileWriterImpl<InternalRow,
writeCols),
targetFileSize);
}
-
- @VisibleForTesting
- static FileWriterContext createFileWriterContext(
- FileFormat fileFormat,
- RowType rowType,
- SimpleColStatsCollector.Factory[] statsCollectors,
- String fileCompression) {
- return new FileWriterContext(
- fileFormat.createWriterFactory(rowType),
- createStatsProducer(fileFormat, rowType, statsCollectors),
- fileCompression);
- }
-
- private static SimpleStatsProducer createStatsProducer(
- FileFormat fileFormat,
- RowType rowType,
- SimpleColStatsCollector.Factory[] statsCollectors) {
- boolean isDisabled =
- Arrays.stream(SimpleColStatsCollector.create(statsCollectors))
- .allMatch(p -> p instanceof
NoneSimpleColStatsCollector);
- if (isDisabled) {
- return SimpleStatsProducer.disabledProducer();
- }
- if (fileFormat instanceof AvroFileFormat) {
- SimpleStatsCollector collector = new SimpleStatsCollector(rowType,
statsCollectors);
- return SimpleStatsProducer.fromCollector(collector);
- }
- return SimpleStatsProducer.fromExtractor(
- fileFormat.createStatsExtractor(rowType,
statsCollectors).orElse(null));
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 8ac16aac2b..96adc5fc21 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -71,7 +71,7 @@ public class RollingFileWriterTest {
() ->
new RowDataFileWriter(
LocalFileIO.create(),
-
RowDataRollingFileWriter.createFileWriterContext(
+
RollingFileWriter.createFileWriterContext(
fileFormat,
SCHEMA,
SimpleColStatsCollector.createFullStatsFactories(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index d9205c814f..cd259903f2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -151,6 +151,7 @@ import static
org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP;
import static
org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static
org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
import static org.apache.paimon.flink.FlinkCatalogOptions.REGISTER_TIMEOUT;
+import static org.apache.paimon.flink.LogicalTypeConversion.toBlobType;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
@@ -1050,6 +1051,16 @@ public class FlinkCatalog extends AbstractCatalog {
RowType rowType = (RowType)
schema.toPhysicalRowDataType().getLogicalType();
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
+ String blobName = options.get(CoreOptions.BLOB_FIELD.key());
+ if (blobName != null) {
+ checkArgument(
+
options.containsKey(CoreOptions.DATA_EVOLUTION_ENABLED.key()),
+ "When setting '"
+ + CoreOptions.BLOB_FIELD.key()
+ + "', you must also set '"
+ + CoreOptions.DATA_EVOLUTION_ENABLED.key()
+ + "'");
+ }
// Serialize virtual columns and watermark to the options
// This is what Flink SQL needs, the storage itself does not need them
options.putAll(columnOptions(schema));
@@ -1069,7 +1080,9 @@ public class FlinkCatalog extends AbstractCatalog {
field ->
schemaBuilder.column(
field.getName(),
- toDataType(field.getType()),
+ field.getName().equals(blobName)
+ ? toBlobType(field.getType())
+ : toDataType(field.getType()),
columnComments.get(field.getName())));
return schemaBuilder.build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
index a4d81364b4..0447a0c6f9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowData.java
@@ -41,7 +41,7 @@ import static
org.apache.paimon.flink.FlinkRowWrapper.fromFlinkRowKind;
/** Convert to Flink row data. */
public class FlinkRowData implements RowData {
- private InternalRow row;
+ protected InternalRow row;
public FlinkRowData(InternalRow row) {
this.row = row;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
new file mode 100644
index 0000000000..541fa5bdf4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowDataWithBlob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.data.InternalRow;
+
+/** Convert to Flink row data with blob. */
+public class FlinkRowDataWithBlob extends FlinkRowData {
+
+ private final int blobField;
+
+ public FlinkRowDataWithBlob(InternalRow row, int blobField) {
+ super(row);
+ this.blobField = blobField;
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return pos == blobField ? row.getBlob(pos).toData() :
row.getBinary(pos);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
index 896f8d296e..8c7779e318 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/LogicalTypeConversion.java
@@ -18,13 +18,18 @@
package org.apache.paimon.flink;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Conversion between {@link LogicalType} and {@link DataType}. */
public class LogicalTypeConversion {
@@ -37,6 +42,13 @@ public class LogicalTypeConversion {
return dataType.accept(DataTypeToLogicalType.INSTANCE);
}
+ public static BlobType toBlobType(LogicalType logicalType) {
+ checkArgument(
+ logicalType instanceof BinaryType || logicalType instanceof
VarBinaryType,
+ "Expected BinaryType or VarBinaryType, but got: " +
logicalType);
+ return new BlobType();
+ }
+
public static RowType
toDataType(org.apache.flink.table.types.logical.RowType logicalType) {
return (RowType) toDataType(logicalType, new AtomicInteger(-1));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 622b186780..f7d7d86791 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
@@ -50,14 +51,16 @@ public class FileStoreSourceReader
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit,
- @Nullable NestedProjectedRowData rowData) {
+ @Nullable NestedProjectedRowData rowData,
+ @Nullable RowType readType) {
// limiter is created in SourceReader, it can be shared in all split
readers
super(
() ->
new FileStoreSourceSplitReader(
tableRead.withIOManager(ioManager),
RecordLimiter.create(limit),
- metrics),
+ metrics,
+ readType),
(element, output, state) ->
FlinkRecordsWithSplitIds.emitRecord(
readerContext, element, output, state,
metrics, rowData),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 94e3c67b6c..7dba5b6015 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -20,12 +20,15 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowDataWithBlob;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pool;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -77,12 +80,13 @@ public class FileStoreSourceSplitReader
public FileStoreSourceSplitReader(
TableRead tableRead,
@Nullable RecordLimiter limiter,
- FileStoreSourceReaderMetrics metrics) {
+ FileStoreSourceReaderMetrics metrics,
+ @Nullable RowType readType) {
this.tableRead = tableRead;
this.limiter = limiter;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
- this.pool.add(new FileStoreRecordIterator());
+ this.pool.add(new FileStoreRecordIterator(readType));
this.paused = false;
this.metrics = metrics;
this.wakeup = new AtomicBoolean(false);
@@ -260,6 +264,20 @@ public class FileStoreSourceSplitReader
private final MutableRecordAndPosition<RowData> recordAndPosition =
new MutableRecordAndPosition<>();
+ @Nullable private final Integer blobField;
+
+ private FileStoreRecordIterator(@Nullable RowType rowType) {
+ this.blobField = rowType == null ? null : blobFieldIndex(rowType);
+ }
+
+ private Integer blobFieldIndex(RowType rowType) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
+ return i;
+ }
+ }
+ return null;
+ }
public FileStoreRecordIterator replace(RecordIterator<InternalRow>
iterator) {
this.iterator = iterator;
@@ -283,7 +301,10 @@ public class FileStoreSourceSplitReader
return null;
}
- recordAndPosition.setNext(new FlinkRowData(row));
+ recordAndPosition.setNext(
+ blobField == null
+ ? new FlinkRowData(row)
+ : new FlinkRowDataWithBlob(row, blobField));
currentNumRead++;
if (limiter != null) {
limiter.increment();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index 42da33cfa7..bfccb5a71c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -67,13 +67,15 @@ public abstract class FlinkSource
new FileStoreSourceReaderMetrics(metricGroup);
TableRead tableRead =
readBuilder.newRead().withMetricRegistry(new
FlinkMetricRegistry(metricGroup));
+
return new FileStoreSourceReader(
context,
tableRead,
sourceReaderMetrics,
ioManager,
limit,
- NestedProjectedRowData.copy(rowData));
+ NestedProjectedRowData.copy(rowData),
+ readBuilder.readType());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 076175b31f..ca9b553036 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -65,7 +65,13 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
FileStoreSourceReaderMetrics sourceReaderMetrics =
new FileStoreSourceReaderMetrics(context.metricGroup());
return new AlignedSourceReader(
- context, readBuilder.newRead(), sourceReaderMetrics,
ioManager, limit, rowData);
+ context,
+ readBuilder.newRead(),
+ sourceReaderMetrics,
+ ioManager,
+ limit,
+ rowData,
+ readBuilder.readType());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 1c3a489b1f..114b94d707 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReflectionUtils;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
@@ -60,8 +61,9 @@ public class AlignedSourceReader extends FileStoreSourceReader
FileStoreSourceReaderMetrics metrics,
IOManager ioManager,
@Nullable Long limit,
- @Nullable NestedProjectedRowData rowData) {
- super(readerContext, tableRead, metrics, ioManager, limit, rowData);
+ @Nullable NestedProjectedRowData rowData,
+ @Nullable RowType readType) {
+ super(readerContext, tableRead, metrics, ioManager, limit, rowData,
readType);
this.nextCheckpointId = null;
try {
// In lower versions of Flink, the SplitFetcherManager does not
provide the getQueue
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
new file mode 100644
index 0000000000..412968468b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test write and read table with blob type. */
+public class BlobTableITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS blob_table (id INT, data STRING,
picture BYTES) WITH ('row-tracking.enabled'='true',
'data-evolution.enabled'='true', 'blob.field'='picture')");
+ }
+
+ @Test
+ public void testBasic() {
+ batchSql("SELECT * FROM blob_table");
+ batchSql("INSERT INTO blob_table VALUES (1, 'paimon', X'48656C6C6F')");
+ assertThat(batchSql("SELECT * FROM blob_table"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "paimon", new byte[] {72, 101, 108, 108,
111}));
+ assertThat(batchSql("SELECT file_path FROM
`blob_table$files`").size()).isEqualTo(2);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index f231b105fa..3900ffa6b4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -164,6 +164,7 @@ public class FileStoreSourceReaderTest {
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null,
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 2afb7fa197..fbe1e72424 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -130,7 +130,8 @@ public class FileStoreSourceSplitReaderTest {
return new FileStoreSourceSplitReader(
tableRead,
limit == null ? null : new RecordLimiter(limit),
- new FileStoreSourceReaderMetrics(new DummyMetricGroup()));
+ new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
+ null);
}
private void innerTestOnce(int skip) throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index 6ea389c161..8d3724822f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -223,6 +223,7 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null,
+ null,
null);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index d1c47ce4df..1bf95b059d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -25,6 +25,8 @@ import
org.apache.paimon.spark.commands.DataEvolutionPaimonWriter.{deserializeCo
import org.apache.paimon.spark.write.WriteHelper
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink._
+import org.apache.paimon.types.DataType
+import org.apache.paimon.types.DataTypeRoot.BLOB
import org.apache.spark.sql._
@@ -61,6 +63,11 @@ case class DataEvolutionPaimonWriter(paimonTable:
FileStoreTable) extends WriteH
assert(data.columns.length == columnNames.size + 2)
val writeType = table.rowType().project(columnNames.asJava)
+ if (writeType.getFieldTypes.stream.anyMatch((t: DataType) => t.is(BLOB))) {
+ throw new UnsupportedOperationException(
+ "DataEvolution does not support writing partial columns mixed with
BLOB type.")
+ }
+
val written =
data.mapPartitions {
iter =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index d26426d6c4..e29b765105 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
@@ -75,17 +76,44 @@ case class MergeIntoPaimonDataEvolutionTable(
override val table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
private val firstRowIds: Seq[Long] = table
- .newSnapshotReader()
- .withManifestEntryFilter(entry => entry.file().firstRowId() != null)
- .read()
- .splits()
+ .store()
+ .newScan()
+ .withManifestEntryFilter(
+ entry =>
+ entry.file().firstRowId() != null && (!isBlobFile(
+ entry
+ .file()
+ .fileName())))
+ .plan()
+ .files()
.asScala
- .map(_.asInstanceOf[DataSplit])
- .flatMap(split => split.dataFiles().asScala.map(s =>
s.firstRowId().asInstanceOf[Long]))
+ .map(file => file.file().firstRowId().asInstanceOf[Long])
.distinct
.sorted
.toSeq
+ private val firstRowIdToBlobFirstRowIds = {
+ val map = new mutable.HashMap[Long, List[Long]]()
+ val files = table
+ .store()
+ .newScan()
+ .withManifestEntryFilter(entry => isBlobFile(entry.file().fileName()))
+ .plan()
+ .files()
+ .asScala
+ .sortBy(f => f.file().firstRowId())
+
+ for (file <- files) {
+ val firstRowId = file.file().firstRowId().asInstanceOf[Long]
+ val firstIdInNormalFile = floorBinarySearch(firstRowIds, firstRowId)
+ map.update(
+ firstIdInNormalFile,
+ map.getOrElseUpdate(firstIdInNormalFile, List.empty[Long]) :+
firstRowId
+ )
+ }
+ map
+ }
+
lazy val targetRelation: DataSourceV2Relation =
PaimonRelation.getPaimonRelation(targetTable)
lazy val sourceRelation: DataSourceV2Relation =
PaimonRelation.getPaimonRelation(sourceTable)
@@ -283,11 +311,20 @@ case class MergeIntoPaimonDataEvolutionTable(
identifier: String): Array[Long] = {
import sparkSession.implicits._
val firstRowIdsFinal = firstRowIds
+ val firstRowIdToBlobFirstRowIdsFinal = firstRowIdToBlobFirstRowIds
val firstRowIdUdf = udf((rowId: Long) =>
floorBinarySearch(firstRowIdsFinal, rowId))
dataset
.select(firstRowIdUdf(col(identifier)))
.distinct()
.as[Long]
+ .flatMap(
+ f => {
+ if (firstRowIdToBlobFirstRowIdsFinal.contains(f)) {
+ firstRowIdToBlobFirstRowIdsFinal(f)
+ } else {
+ Seq(f)
+ }
+ })
.collect()
}