This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d29f5a685d040709d3bc76c7b55a25fc7a277edb Author: Faiz <[email protected]> AuthorDate: Wed Mar 25 22:06:20 2026 +0800 [flink][spark] path_to_descriptor function supports http url (#7529) --- .../paimon/flink/function/PathToDescriptor.java | 2 +- .../org/apache/paimon/flink/BlobTableITCase.java | 31 ++++++++++++++++++++++ .../spark/function/PathToDescriptorFunction.java | 2 +- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java index 87d136f717..210116b9ca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/function/PathToDescriptor.java @@ -30,7 +30,7 @@ public class PathToDescriptor extends ScalarFunction { return null; } - BlobDescriptor descriptor = new BlobDescriptor(path, 0, Long.MAX_VALUE); + BlobDescriptor descriptor = new BlobDescriptor(path, 0, -1); return descriptor.serialize(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java index 29938f36be..e490020709 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BlobTableITCase.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.BlobRef; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.rest.TestHttpWebServer; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.UriReader; @@ -299,6 +300,36 @@ public class BlobTableITCase extends CatalogITCaseBase { assertThat(allColumns).hasSize(1); } + @Test + public void testWriteBlobWithHttpUrlDescriptor() throws Exception { + TestHttpWebServer httpServer = new TestHttpWebServer("/blob_data"); + httpServer.start(); + try { + String blobContent = "hello-http-blob"; + String httpUrl = httpServer.getBaseUrl(); + + // Enqueue response for the write phase + httpServer.enqueueResponse(blobContent, 200); + + // Use sys.path_to_descriptor with HTTP URL + batchSql( + "INSERT INTO blob_table_descriptor VALUES (1, 'http-blob', sys.path_to_descriptor('" + + httpUrl + + "'))"); + + // Read back with blob-as-descriptor=false to get raw data + batchSql("ALTER TABLE blob_table_descriptor SET ('blob-as-descriptor'='false')"); + List<Row> result = batchSql("SELECT * FROM blob_table_descriptor"); + assertThat(result).hasSize(1); + assertThat(result.get(0).getField(0)).isEqualTo(1); + assertThat(result.get(0).getField(1)).isEqualTo("http-blob"); + assertThat((byte[]) result.get(0).getField(2)) + .isEqualTo(blobContent.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } finally { + httpServer.stop(); + } + } + @Test public void testBlobTypeSchemaEquals() throws Exception { // Step 1: Create a Paimon table with blob field via Flink SQL diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java index ea6f306cc3..8c26b5eefd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/function/PathToDescriptorFunction.java @@ -45,7 +45,7 @@ public class PathToDescriptorFunction implements ScalarFunction<byte[]>, Seriali return null; } - BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0, Long.MAX_VALUE); + BlobDescriptor descriptor = new BlobDescriptor(path.toString(), 0, -1); return descriptor.serialize(); }
