This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch feature-branch-rfc100-unstructured-data
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to
refs/heads/feature-branch-rfc100-unstructured-data by this push:
new e940762f67c3 feat: Add HoodieBaseLanceFileWriter and implementation
for SparkFileWriter (#14131)
e940762f67c3 is described below
commit e940762f67c3b38438f32373feb8dac6d9117ff3
Author: Rahil C <[email protected]>
AuthorDate: Tue Oct 28 14:49:15 2025 -0700
feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
(#14131)
---
.github/workflows/bot.yml | 371 +--------------
.github/workflows/maven_artifact_validation.yml | 12 +-
.github/workflows/release_candidate_validation.yml | 12 +-
azure-pipelines-20230430.yml | 105 ++---
hudi-client/hudi-spark-client/pom.xml | 6 +
.../hudi/io/storage/HoodieSparkLanceWriter.java | 147 ++++++
hudi-hadoop-common/pom.xml | 14 +
.../hudi/io/lance/HoodieBaseLanceWriter.java | 196 ++++++++
.../io/storage/TestHoodieSparkLanceWriter.java | 503 +++++++++++++++++++++
.../docker_java17/docker_java17_test.sh | 9 +-
pom.xml | 41 +-
11 files changed, 971 insertions(+), 445 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index ba2e4d996e18..2dce91058dd7 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -6,6 +6,7 @@ on:
- master
- 'release-*'
- branch-0.x
+ - feature-branch-rfc100-unstructured-data
pull_request:
paths-ignore:
- '**.bmp'
@@ -22,6 +23,7 @@ on:
- master
- 'release-*'
- branch-0.x
+ - feature-branch-rfc100-unstructured-data
concurrency:
group: ${{ github.ref }}
@@ -42,10 +44,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
@@ -59,281 +61,6 @@ jobs:
- name: RAT check
run: ./scripts/release/validate_source_rat.sh
- test-spark-java-tests-part1:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Java UT 1 - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- if: ${{ !endsWith(env.SPARK_PROFILE, '3.5') ||
!endsWith(env.SCALA_PROFILE, '2.12') }} # skip test Spark 3.5 and Scala 2.12 as
it's covered by Azure CI
- run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DwildcardSuites=skipScalaTests $JAVA_UT_FILTER1 -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-spark-java-tests-part2:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Quickstart Test
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl
hudi-examples/hudi-examples-spark $MVN_ARGS
- - name: Java UT 2 - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DwildcardSuites=skipScalaTests $JAVA_UT_FILTER2 -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Java FTA - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-spark-java-tests-part3:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Java FTB - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests-b -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Java FTC - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests-c -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-spark-scala-dml-tests:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Scala UT - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-Dtest=skipJavaTests $SCALA_TEST_DML_FILTER -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Scala FT - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-Dtest=skipJavaTests $SCALA_TEST_DML_FILTER -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-spark-scala-other-tests:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.3"
- sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
-
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.4"
- sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
-
- - scalaProfile: "scala-2.13"
- sparkProfile: "spark3.5"
- sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- - name: Scala UT - Common & Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Punit-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-Dtest=skipJavaTests $SCALA_TEST_OTHERS_FILTER -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- - name: Scala FT - Spark
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_MODULES: ${{ matrix.sparkModules }}
- run:
- mvn test -Pfunctional-tests -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-Dtest=skipJavaTests $SCALA_TEST_OTHERS_FILTER -DfailIfNoTests=false -pl
"$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
-
- test-hudi-hadoop-mr-and-hudi-java-client:
- runs-on: ubuntu-latest
- timeout-minutes: 40
- strategy:
- matrix:
- include:
- - scalaProfile: "scala-2.12"
- sparkProfile: "spark3.5"
- flinkProfile: "flink1.20"
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Generate Maven Wrapper
- run:
- mvn -N io.takari:maven:wrapper
- - name: Build Project
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- run:
- ./mvnw clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"FLINK_PROFILE" -DskipTests=true -Phudi-platform-service $MVN_ARGS -am -pl
hudi-hadoop-mr,hudi-client/hudi-java-client
- - name: UT - hudi-hadoop-mr and hudi-client/hudi-java-client
- env:
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- run:
- ./mvnw test -Punit-tests -fae -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"FLINK_PROFILE" -pl hudi-hadoop-mr,hudi-client/hudi-java-client $MVN_ARGS
-
test-spark-java17-java-tests-part1:
runs-on: ubuntu-latest
strategy:
@@ -892,10 +619,10 @@ jobs:
flinkParquetVersion: '1.13.1'
steps:
- uses: actions/checkout@v3
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
architecture: x64
- name: Build Project
@@ -958,88 +685,6 @@ jobs:
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
./packaging/bundle-validation/run_docker_java17.sh
- validate-bundles:
- runs-on: ubuntu-latest
- strategy:
- matrix:
- include:
- - scalaProfile: 'scala-2.13'
- flinkProfile: 'flink1.20'
- flinkAvroVersion: '1.11.4'
- flinkParquetVersion: '1.13.1'
- sparkProfile: 'spark3.5'
- sparkRuntime: 'spark3.5.1'
- - scalaProfile: 'scala-2.13'
- flinkProfile: 'flink1.19'
- flinkAvroVersion: '1.11.4'
- flinkParquetVersion: '1.13.1'
- sparkProfile: 'spark3.5'
- sparkRuntime: 'spark3.5.1'
- - scalaProfile: 'scala-2.12'
- flinkProfile: 'flink1.18'
- flinkAvroVersion: '1.11.4'
- flinkParquetVersion: '1.13.1'
- sparkProfile: 'spark3.4'
- sparkRuntime: 'spark3.4.3'
- - scalaProfile: 'scala-2.12'
- flinkProfile: 'flink1.17'
- flinkAvroVersion: '1.11.4'
- flinkParquetVersion: '1.12.3'
- sparkProfile: 'spark3.3'
- sparkRuntime: 'spark3.3.4'
-
- steps:
- - uses: actions/checkout@v3
- - name: Set up JDK 8
- uses: actions/setup-java@v3
- with:
- java-version: '8'
- distribution: 'temurin'
- architecture: x64
- - name: Build Project
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- FLINK_AVRO_VERSION: ${{ matrix.flinkAvroVersion }}
- FLINK_PARQUET_VERSION: ${{ matrix.flinkParquetVersion }}
- run: |
- if [ "$SCALA_PROFILE" == "scala-2.13" ]; then
- mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DdeployArtifacts=true -DskipTests=true $MVN_ARGS -pl
packaging/hudi-hadoop-mr-bundle,packaging/hudi-spark-bundle,packaging/hudi-utilities-bundle,packaging/hudi-utilities-slim-bundle,packaging/hudi-cli-bundle
-am
- else
- mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"$FLINK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS
- # TODO remove the sudo below. It's a needed workaround as detailed
in HUDI-5708.
- sudo chown -R "$USER:$(id -g -n)"
hudi-platform-service/hudi-metaserver/target/generated-sources
- mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE"
-DdeployArtifacts=true -DskipTests=true $MVN_ARGS -pl
packaging/hudi-flink-bundle -am -Davro.version="$FLINK_AVRO_VERSION"
-Dparquet.version="$FLINK_PARQUET_VERSION"
-Dparquet.version="$FLINK_PARQUET_VERSION"
- fi
- - name: IT - Bundle Validation - OpenJDK 8
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- run: |
- HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
- ./packaging/bundle-validation/ci_run.sh hudi_docker_java8
$HUDI_VERSION openjdk8
- - name: IT - Bundle Validation - OpenJDK 11
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- run: |
- HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
- ./packaging/bundle-validation/ci_run.sh hudi_docker_java11
$HUDI_VERSION openjdk11
- - name: IT - Bundle Validation - OpenJDK 17
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- run: |
- HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q
-DforceStdout)
- ./packaging/bundle-validation/ci_run.sh hudi_docker_java17
$HUDI_VERSION openjdk17
-
validate-bundle-spark4:
runs-on: ubuntu-latest
strategy:
@@ -1191,10 +836,10 @@ jobs:
sparkArchive: 'spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz'
steps:
- uses: actions/checkout@v3
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
architecture: x64
- name: Check disk space
diff --git a/.github/workflows/maven_artifact_validation.yml
b/.github/workflows/maven_artifact_validation.yml
index 97f205e2405c..1c1ca0e169bd 100644
--- a/.github/workflows/maven_artifact_validation.yml
+++ b/.github/workflows/maven_artifact_validation.yml
@@ -44,21 +44,13 @@ jobs:
sparkRuntime: 'spark3.3.4'
steps:
- uses: actions/checkout@v3
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- - name: IT - Bundle Validation - OpenJDK 8
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- run: |
- ./packaging/bundle-validation/ci_run.sh hudi_docker_java8
$HUDI_VERSION openjdk8 "" $MAVEN_BASE_URL
- name: IT - Bundle Validation - OpenJDK 11
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
diff --git a/.github/workflows/release_candidate_validation.yml
b/.github/workflows/release_candidate_validation.yml
index b60a909bfa4d..0a4dea66abeb 100644
--- a/.github/workflows/release_candidate_validation.yml
+++ b/.github/workflows/release_candidate_validation.yml
@@ -44,21 +44,13 @@ jobs:
sparkRuntime: 'spark3.3.4'
steps:
- uses: actions/checkout@v3
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v3
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- - name: IT - Bundle Validation - OpenJDK 8
- env:
- FLINK_PROFILE: ${{ matrix.flinkProfile }}
- SPARK_PROFILE: ${{ matrix.sparkProfile }}
- SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
- SCALA_PROFILE: ${{ matrix.scalaProfile }}
- run: |
- ./packaging/bundle-validation/ci_run.sh hudi_docker_java8
$HUDI_VERSION openjdk8 $STAGING_REPO_NUM
- name: IT - Bundle Validation - OpenJDK 11
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 957d55053ab3..8f3101a10abd 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -134,7 +134,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl hudi-client/hudi-spark-client
-am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: UT hudi-hadoop-common
inputs:
@@ -142,7 +142,7 @@ stages:
goals: 'test'
options: $(MVN_OPTS_TEST) -Punit-tests
$(JACOCO_AGENT_DESTFILE1_ARG) -pl hudi-hadoop-common
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- task: Maven@4
displayName: UT client/spark-client
@@ -151,7 +151,7 @@ stages:
goals: 'test'
options: $(MVN_OPTS_TEST) -Punit-tests
$(JACOCO_AGENT_DESTFILE2_ARG) -pl hudi-client/hudi-spark-client
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- task: Maven@4
displayName: FT client/spark-client
@@ -162,7 +162,7 @@ stages:
options: $(MVN_OPTS_TEST) -Pfunctional-tests
-Djacoco.agent.dest.filename=jacoco2.corrupt -pl hudi-client/hudi-spark-client
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -188,7 +188,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl
hudi-spark-datasource/hudi-spark -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: FTA hudi-spark-datasource/hudi-spark
inputs:
@@ -197,7 +197,7 @@ stages:
options: $(MVN_OPTS_TEST) -Pfunctional-tests
$(JACOCO_AGENT_DESTFILE1_ARG) -pl hudi-spark-datasource/hudi-spark
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -223,7 +223,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl $(JOB3456_MODULES) -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: Java UT spark-datasource functional package
inputs:
@@ -232,7 +232,7 @@ stages:
options: $(MVN_OPTS_TEST) -Punit-tests $(JAVA_MVN_TEST_FILTER)
$(MVN_ARG_FUNCTIONAL_PACKAGE_TEST) $(JACOCO_AGENT_DESTFILE2_ARG) -pl
$(JOB3456_MODULES)
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -258,7 +258,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl $(JOB3456_MODULES) -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: Java UT spark-datasource non-functional package
inputs:
@@ -267,7 +267,7 @@ stages:
options: $(MVN_OPTS_TEST) -Punit-tests $(JAVA_MVN_TEST_FILTER)
$(MVN_ARG_NON_FUNCTIONAL_PACKAGE_TEST) $(JACOCO_AGENT_DESTFILE1_ARG) -pl
$(JOB3456_MODULES)
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -293,7 +293,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl $(JOB3456_MODULES) -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: Scala UT spark-datasource DML 1
inputs:
@@ -302,7 +302,7 @@ stages:
options: $(MVN_OPTS_TEST) -Punit-tests $(SCALA_MVN_TEST_FILTER)
-DwildcardSuites="org.apache.spark.sql.hudi.dml.others"
$(JACOCO_AGENT_DESTFILE1_ARG) -pl $(JOB3456_MODULES)
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -328,7 +328,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl $(JOB3456_MODULES) -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: Scala UT spark-datasource DDL & Others
inputs:
@@ -337,7 +337,7 @@ stages:
options: $(MVN_OPTS_TEST) -Punit-tests $(SCALA_MVN_TEST_FILTER)
-DwildcardSuites="$(JOB6_SPARK_DDL_OTHERS_WILDCARD_SUITES)"
$(JACOCO_AGENT_DESTFILE1_ARG) -pl $(JOB3456_MODULES)
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -353,41 +353,36 @@ stages:
grep "testcase" */target/surefire-reports/*.xml
*/*/target/surefire-reports/*.xml | awk -F'"' ' { print $6,$4,$2 } ' | sort -nr
| head -n 100
displayName: Top 100 long-running testcases
- job: UT_FT_7
- displayName: UT Hudi Streamer & FT utilities
+ displayName: UT Hudi Streamer & FT utilities (Java 11)
timeoutInMinutes: '90'
steps:
- - task: Docker@2
- displayName: "login to docker hub"
- inputs:
- command: "login"
- containerRegistry: "apachehudi-docker-hub"
- - task: Docker@2
- displayName: "load repo into image"
+ - task: Maven@4
+ displayName: maven install
inputs:
- containerRegistry: 'apachehudi-docker-hub'
- repository: 'apachehudi/hudi-ci-bundle-validation-base'
- command: 'build'
- Dockerfile: '**/Dockerfile'
- ImageName: $(Build.BuildId)
- - task: Docker@2
- displayName: "UT Hudi Streamer & FT utilities"
+ mavenPomFile: 'pom.xml'
+ goals: 'clean install'
+ options: $(MVN_OPTS_INSTALL) -Phudi-platform-service
-Pthrift-gen-source -pl hudi-utilities -am
+ publishJUnitResults: false
+ jdkVersionOption: '1.11'
+ - task: Maven@4
+ displayName: UT Hudi Streamer (TestHoodie*)
inputs:
- containerRegistry: 'apachehudi-docker-hub'
- repository: 'apachehudi/hudi-ci-bundle-validation-base'
- command: 'run'
- arguments: >
- -v $(Build.SourcesDirectory):/hudi
- -i
docker.io/apachehudi/hudi-ci-bundle-validation-base:$(Build.BuildId)
- /bin/bash -c "mvn clean install $(MVN_OPTS_INSTALL)
-Phudi-platform-service -Pthrift-gen-source -pl hudi-utilities -am
- && mvn test $(MVN_OPTS_TEST) -Punit-tests
$(JACOCO_AGENT_DESTFILE1_ARG) -Dtest="TestHoodie*" -DfailIfNoTests=false
-DargLine="-Xmx4g" -pl hudi-utilities
- && mvn test $(MVN_OPTS_TEST) -Pfunctional-tests
$(JACOCO_AGENT_DESTFILE2_ARG) -DfailIfNoTests=false -DargLine="-Xmx4g" -pl
hudi-utilities"
- - task: PublishTestResults@2
- displayName: 'Publish Test Results'
+ mavenPomFile: 'pom.xml'
+ goals: 'test'
+ options: $(MVN_OPTS_TEST) -Punit-tests
$(JACOCO_AGENT_DESTFILE1_ARG) -Dtest="TestHoodie*" -DfailIfNoTests=false -pl
hudi-utilities
+ publishJUnitResults: false
+ jdkVersionOption: '1.11'
+ mavenOptions: '-Xmx4g'
+ - task: Maven@4
+ displayName: FT utilities
inputs:
- testResultsFormat: 'JUnit'
+ mavenPomFile: 'pom.xml'
+ goals: 'test'
+ options: $(MVN_OPTS_TEST) -Pfunctional-tests
$(JACOCO_AGENT_DESTFILE2_ARG) -DfailIfNoTests=false -pl hudi-utilities
+ publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- searchFolder: '$(Build.SourcesDirectory)'
- failTaskOnFailedTests: true
+ jdkVersionOption: '1.11'
+ mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
./scripts/jacoco/merge_jacoco_exec_files.sh
jacoco-lib/lib/jacococli.jar $(Build.SourcesDirectory)
@@ -412,7 +407,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl $(JOB3456_MODULES) -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: Scala UT spark-datasource Hudi SQL features
inputs:
@@ -420,7 +415,7 @@ stages:
goals: 'test'
options: $(MVN_OPTS_TEST) -Punit-tests $(SCALA_MVN_TEST_FILTER)
-DwildcardSuites="org.apache.spark.sql.hudi.feature"
$(JACOCO_AGENT_DESTFILE1_ARG) -pl $(JOB3456_MODULES)
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- task: Maven@4
displayName: Scala UT spark-datasource DML 2
@@ -429,7 +424,7 @@ stages:
goals: 'test'
options: $(MVN_OPTS_TEST) -Punit-tests $(SCALA_MVN_TEST_FILTER)
-DwildcardSuites="org.apache.spark.sql.hudi.dml.insert"
$(JACOCO_AGENT_DESTFILE2_ARG) -pl $(JOB3456_MODULES)
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- task: Maven@4
displayName: FTC hudi-spark-datasource/hudi-spark
@@ -439,7 +434,7 @@ stages:
options: $(MVN_OPTS_TEST) -Pfunctional-tests-c
$(JACOCO_AGENT_DESTFILE3_ARG) -pl hudi-spark-datasource/hudi-spark
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -465,7 +460,7 @@ stages:
goals: 'clean install'
options: $(MVN_OPTS_INSTALL) -pl
hudi-client/hudi-spark-client,hudi-spark-datasource/hudi-spark -am
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- task: Maven@4
displayName: FTB hudi-spark-datasource/hudi-spark
inputs:
@@ -474,7 +469,7 @@ stages:
options: $(MVN_OPTS_TEST) -Pfunctional-tests-b
$(JACOCO_AGENT_DESTFILE1_ARG) -pl hudi-spark-datasource/hudi-spark
publishJUnitResults: true
testResultsFiles: '**/surefire-reports/TEST-*.xml'
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
mavenOptions: '-Xmx4g'
- script: |
./scripts/jacoco/download_jacoco.sh
@@ -515,10 +510,10 @@ stages:
arguments: >
-v $(Build.SourcesDirectory):/hudi
-i
docker.io/apachehudi/hudi-ci-bundle-validation-base:$(Build.BuildId)
- /bin/bash -c "mvn clean install $(MVN_OPTS_INSTALL)
-Phudi-platform-service -Pthrift-gen-source
- && mvn test $(MVN_OPTS_TEST) -Punit-tests
-DfailIfNoTests=false -DargLine="-Xmx4g" $(JACOCO_AGENT_DESTFILE1_ARG) -pl
$(JOB10_UT_MODULES)
- && mvn test $(MVN_OPTS_TEST) -Punit-tests
$(JACOCO_AGENT_DESTFILE2_ARG) -Dtest="!TestHoodie*" -DfailIfNoTests=false
-DargLine="-Xmx4g" -pl hudi-utilities
- && mvn test $(MVN_OPTS_TEST) -Pfunctional-tests
-DfailIfNoTests=false -DargLine="-Xmx4g" $(JACOCO_AGENT_DESTFILE3_ARG) -pl
$(JOB10_FT_MODULES)"
+ /bin/bash -c "mvn clean install $(MVN_OPTS_INSTALL)
-Djava.version=11 -Phudi-platform-service -Pthrift-gen-source
+ && mvn test $(MVN_OPTS_TEST) -Djava.version=11 -Punit-tests
-DfailIfNoTests=false -DargLine="-Xmx4g" $(JACOCO_AGENT_DESTFILE1_ARG) -pl
$(JOB10_UT_MODULES)
+ && mvn test $(MVN_OPTS_TEST) -Djava.version=11 -Punit-tests
$(JACOCO_AGENT_DESTFILE2_ARG) -Dtest="!TestHoodie*" -DfailIfNoTests=false
-DargLine="-Xmx4g" -pl hudi-utilities
+ && mvn test $(MVN_OPTS_TEST) -Djava.version=11
-Pfunctional-tests -DfailIfNoTests=false -DargLine="-Xmx4g"
$(JACOCO_AGENT_DESTFILE3_ARG) -pl $(JOB10_FT_MODULES)"
- task: PublishTestResults@2
displayName: 'Publish Test Results'
inputs:
@@ -563,7 +558,7 @@ stages:
**/merged-jacoco-$(Build.BuildId)-*/*.exec
- task: JavaToolInstaller@0
inputs:
- versionSpec: '8'
+ versionSpec: '11'
jdkArchitectureOption: 'x64'
jdkSourceOption: 'PreInstalled'
- script: |
@@ -583,7 +578,7 @@ stages:
goals: 'clean package'
options: $(MVN_OPTS_INSTALL) -Pcopy-files-for-jacoco -pl
$(JACOCO_MODULES)
publishJUnitResults: false
- jdkVersionOption: '1.8'
+ jdkVersionOption: '1.11'
- script: |
./scripts/jacoco/generate_jacoco_coverage_report.sh
jacoco-lib/lib/jacococli.jar $(Build.SourcesDirectory)
displayName: 'Generate JaCoCo Code Coverage Report'
diff --git a/hudi-client/hudi-spark-client/pom.xml
b/hudi-client/hudi-spark-client/pom.xml
index e9266904002c..3cc8bf3b5513 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -94,6 +94,12 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <!-- Lance Spark for Arrow conversion -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>${lance.spark.artifact}</artifactId>
+ </dependency>
+
<!-- Used for adding kryo serializers for protobuf -->
<dependency>
<groupId>com.twitter</groupId>
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..17de5c334f8d
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.spark.arrow.LanceArrowWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.LanceArrowUtils;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+
+/**
+ * Spark Lance file writer implementing {@link HoodieSparkFileWriter}.
+ *
+ * This writer integrates with Hudi's storage I/O layer and supports:
+ * - Hudi metadata field population
+ * - Record key tracking (for bloom filters - TODO)
+ * - Sequence ID generation
+ */
+public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
implements HoodieSparkFileWriter {
+
+ private static final long DEFAULT_MAX_FILE_SIZE = 120 * 1024 * 1024; // 120MB
+ private static final String DEFAULT_TIMEZONE = "UTC";
+
+ private final StructType sparkSchema;
+ private final Schema arrowSchema;
+ private final UTF8String fileName;
+ private final UTF8String instantTime;
+ private final boolean populateMetaFields;
+ private final Function<Long, String> seqIdGenerator;
+ private LanceArrowWriter writer;
+
+ /**
+ * Constructor for Spark Lance writer.
+ *
+ * @param file Path where Lance file will be written
+ * @param sparkSchema Spark schema for the data
+ * @param instantTime Instant time for the commit
+ * @param taskContextSupplier Task context supplier for partition ID
+ * @param storage HoodieStorage instance
+ * @param populateMetaFields Whether to populate Hudi metadata fields
+ * @throws IOException if writer initialization fails
+ */
+ public HoodieSparkLanceWriter(StoragePath file,
+ StructType sparkSchema,
+ String instantTime,
+ TaskContextSupplier taskContextSupplier,
+ HoodieStorage storage,
+ boolean populateMetaFields) throws IOException
{
+ super(storage, file, DEFAULT_BATCH_SIZE, DEFAULT_MAX_FILE_SIZE);
+ this.sparkSchema = sparkSchema;
+ this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema,
DEFAULT_TIMEZONE, true, false);
+ this.fileName = UTF8String.fromString(file.getName());
+ this.instantTime = UTF8String.fromString(instantTime);
+ this.populateMetaFields = populateMetaFields;
+ this.seqIdGenerator = recordIndex -> {
+ Integer partitionId = taskContextSupplier.getPartitionIdSupplier().get();
+ return HoodieRecord.generateSequenceId(instantTime, partitionId,
recordIndex);
+ };
+ }
+
+ @Override
+ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws
IOException {
+ if (populateMetaFields) {
+ UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
+ updateRecordMetadata(row, recordKey, key.getPartitionPath(),
getWrittenRecordCount());
+ super.write(row);
+ } else {
+ super.write(row);
+ }
+ }
+
+ @Override
+ public void writeRow(String recordKey, InternalRow row) throws IOException {
+ super.write(row);
+ }
+
+ @Override
+ protected void populateVectorSchemaRoot(List<InternalRow> records) {
+ if (writer == null) {
+ writer = LanceArrowWriter.create(this.root, sparkSchema);
+ }
+ // Reset writer state from previous batch
+ writer.reset();
+ for (InternalRow record : records) {
+ writer.write(record);
+ }
+ // Finalize the writer (sets row count)
+ writer.finish();
+ }
+
+ @Override
+ protected Schema getArrowSchema() {
+ return arrowSchema;
+ }
+
+ /**
+ * Update Hudi metadata fields in the InternalRow.
+ *
+ * @param row InternalRow to update
+ * @param recordKey Record key
+ * @param partitionPath Partition path
+ * @param recordCount Current record count for sequence ID generation
+ */
+ protected void updateRecordMetadata(InternalRow row,
+ UTF8String recordKey,
+ String partitionPath,
+ long recordCount) {
+ row.update(COMMIT_TIME_METADATA_FIELD.ordinal(), instantTime);
+ row.update(COMMIT_SEQNO_METADATA_FIELD.ordinal(),
UTF8String.fromString(seqIdGenerator.apply(recordCount)));
+ row.update(RECORD_KEY_METADATA_FIELD.ordinal(), recordKey);
+ row.update(PARTITION_PATH_METADATA_FIELD.ordinal(),
UTF8String.fromString(partitionPath));
+ row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
+ }
+}
\ No newline at end of file
diff --git a/hudi-hadoop-common/pom.xml b/hudi-hadoop-common/pom.xml
index 01d533cf444d..c075097f62fe 100644
--- a/hudi-hadoop-common/pom.xml
+++ b/hudi-hadoop-common/pom.xml
@@ -143,5 +143,19 @@
<version>1.17.2</version>
<scope>test</scope>
</dependency>
+ <!-- Lance Core SDK -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>lance-core</artifactId>
+ </dependency>
+ <!-- Apache Arrow for Lance data representation -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
new file mode 100644
index 000000000000..6860c019cf6c
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/lance/HoodieBaseLanceWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.lance;
+
+import com.lancedb.lance.file.LanceFileWriter;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for Hudi Lance file writers supporting different record types.
+ *
+ * This class handles common Lance file operations including:
+ * - LanceFileWriter lifecycle management
+ * - BufferAllocator management
+ * - Record buffering and batch flushing
+ * - File size checks
+ *
+ * Subclasses must implement type-specific conversion to Arrow format.
+ *
+ * @param <R> The record type (e.g., GenericRecord, InternalRow)
+ */
+@NotThreadSafe
+public abstract class HoodieBaseLanceWriter<R> implements Closeable {
+ protected static final int DEFAULT_BATCH_SIZE = 1000;
+ protected final HoodieStorage storage;
+ protected final StoragePath path;
+ protected final BufferAllocator allocator;
+ protected final List<R> bufferedRecords;
+ protected final int batchSize;
+ protected final long maxFileSize;
+ protected long writtenRecordCount = 0;
+ protected VectorSchemaRoot root;
+
+ private LanceFileWriter writer;
+
+ /**
+ * Constructor for base Lance writer.
+ *
+ * @param storage HoodieStorage instance
+ * @param path Path where Lance file will be written
+ * @param batchSize Number of records to buffer before flushing to Lance
+ * @param maxFileSize Maximum file size in bytes before rolling over to new
file
+ */
+ protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int
batchSize, long maxFileSize) {
+ this.storage = storage;
+ this.path = path;
+ this.allocator = new RootAllocator(Long.MAX_VALUE);
+ this.bufferedRecords = new ArrayList<>(batchSize);
+ this.batchSize = batchSize;
+ this.maxFileSize = maxFileSize;
+ }
+
+ /**
+ * Populate the VectorSchemaRoot with buffered records.
+ * Subclasses must implement type-specific conversion logic.
+ * The VectorSchemaRoot field is reused across batches and managed by this
base class.
+ *
+ * @param records List of records to convert
+ */
+ protected abstract void populateVectorSchemaRoot(List<R> records);
+
+ /**
+ * Get the Arrow schema for this writer.
+ * Subclasses must provide the Arrow schema corresponding to their record
type.
+ *
+ * @return Arrow schema
+ */
+ protected abstract Schema getArrowSchema();
+
+ /**
+ * Write a single record. Records are buffered and flushed in batches.
+ *
+ * @param record Record to write
+ * @throws IOException if write fails
+ */
+ public void write(R record) throws IOException {
+ bufferedRecords.add(record);
+ writtenRecordCount++;
+
+ if (bufferedRecords.size() >= batchSize) {
+ flushBatch();
+ }
+ }
+
+ /**
+ * Check if writer can accept more records based on file size.
+ * Uses filesystem-based size checking (similar to ORC/HFile approach).
+ *
+ * @return true if writer can accept more records, false if file size limit
reached
+ */
+ public boolean canWrite() {
+ //TODO will need to implement proper way to compute this
+ return true;
+ }
+
+ /**
+ * Get the total number of records written so far.
+ *
+ * @return Number of records written
+ */
+ public long getWrittenRecordCount() {
+ return writtenRecordCount;
+ }
+
+ /**
+ * Close the writer, flushing any remaining buffered records.
+ *
+ * @throws IOException if close fails
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ // Flush any remaining buffered records
+ if (!bufferedRecords.isEmpty()) {
+ flushBatch();
+ }
+
+ // Close Lance writer
+ if (writer != null) {
+ writer.close();
+ }
+
+ // Close VectorSchemaRoot
+ if (root != null) {
+ root.close();
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to close Lance writer: " + path, e);
+ } finally {
+ // Always close allocator
+ allocator.close();
+ }
+ }
+
+ /**
+ * Flush buffered records to Lance file.
+ */
+ private void flushBatch() throws IOException {
+ if (bufferedRecords.isEmpty()) {
+ return;
+ }
+
+ // Lazy initialization of writer and root
+ if (writer == null) {
+ initializeWriter();
+ }
+ if (root == null) {
+ root = VectorSchemaRoot.create(getArrowSchema(), allocator);
+ }
+
+ // Reset root state for new batch
+ root.setRowCount(0);
+
+ // Populate root with records and write to Lance
+ populateVectorSchemaRoot(bufferedRecords);
+ writer.write(root);
+
+ // Clear buffer
+ bufferedRecords.clear();
+ }
+
+ /**
+ * Initialize LanceFileWriter (lazy initialization).
+ */
+ private void initializeWriter() throws IOException {
+ writer = LanceFileWriter.open(path.toString(), allocator, null);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
new file mode 100644
index 000000000000..1dad1d03b6a4
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/TestHoodieSparkLanceWriter.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSparkLanceWriter}.
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+public class TestHoodieSparkLanceWriter {
+
+ @TempDir
+ File tempDir;
+
+ private HoodieStorage storage;
+ private SparkTaskContextSupplier taskContextSupplier;
+ private String instantTime;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ storage = HoodieTestUtils.getStorage(tempDir.getAbsolutePath());
+ taskContextSupplier = new SparkTaskContextSupplier();
+ instantTime = "20251201120000000";
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (storage != null) {
+ storage.close();
+ }
+ }
+
+ @Test
+ public void testWriteRowWithMetadataPopulation() throws Exception {
+ // Schema WITH meta fields (writer expects this when
populateMetaFields=true)
+ StructType schema = createSchemaWithMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_with_metadata.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, true);
+
+ // Create row with PLACEHOLDER meta fields + user data
+ InternalRow row = createRowWithMetaFields(1, "Alice", 30L);
+ HoodieKey key = new HoodieKey("key1", "partition1");
+
+ writer.writeRowWithMetadata(key, row);
+ writer.close();
+
+ // Verify using LanceFileReader
+ assertTrue(storage.exists(path), "Lance file should exist");
+
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+
+ assertEquals(1, reader.numRows(), "Should have 1 record");
+
+ // Schema should have 5 meta fields + 3 user fields = 8 fields
+ assertEquals(8, reader.schema().getFields().size(), "Should have 8
fields");
+
+ // Read and verify data
+ ArrowReader arrowReader = reader.readAll(null, null, Integer.MAX_VALUE);
+ assertTrue(arrowReader.loadNextBatch(), "Should load batch");
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Verify meta fields were populated
+ VarCharVector commitTimeVector = (VarCharVector)
root.getVector(COMMIT_TIME_METADATA_FIELD.getFieldName());
+ assertNotNull(commitTimeVector);
+ assertEquals(instantTime, new String(commitTimeVector.get(0)), "Commit
time should match");
+
+ VarCharVector recordKeyVector = (VarCharVector)
root.getVector(RECORD_KEY_METADATA_FIELD.getFieldName());
+ assertEquals("key1", new String(recordKeyVector.get(0)), "Record key
should match");
+
+ VarCharVector partitionPathVector = (VarCharVector)
root.getVector(PARTITION_PATH_METADATA_FIELD.getFieldName());
+ assertEquals("partition1", new String(partitionPathVector.get(0)),
"Partition path should match");
+
+ VarCharVector fileNameVector = (VarCharVector)
root.getVector(FILENAME_METADATA_FIELD.getFieldName());
+ assertEquals(path.getName(), new String(fileNameVector.get(0)), "File
name should match");
+
+ VarCharVector seqNoVector = (VarCharVector)
root.getVector(COMMIT_SEQNO_METADATA_FIELD.getFieldName());
+ String seqNo = new String(seqNoVector.get(0));
+ assertTrue(seqNo.startsWith(instantTime), "Sequence number should start
with instant time");
+
+ // Verify user data fields
+ IntVector idVector = (IntVector) root.getVector("id");
+ assertEquals(1, idVector.get(0), "ID should match");
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertEquals("Alice", new String(nameVector.get(0)), "Name should
match");
+
+ BigIntVector ageVector = (BigIntVector) root.getVector("age");
+ assertEquals(30L, ageVector.get(0), "Age should match");
+
+ arrowReader.close();
+ }
+ }
+
+ @Test
+ public void testWriteRowWithoutMetadataPopulation() throws Exception {
+ // Schema WITHOUT meta fields
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_without_metadata.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ // Create row with just user data (no meta fields)
+ InternalRow row = createRow(1, "Bob", 25L);
+ HoodieKey key = new HoodieKey("key2", "partition2");
+
+ writer.writeRowWithMetadata(key, row);
+ writer.close();
+
+ // Verify using LanceFileReader
+ assertTrue(storage.exists(path));
+
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+
+ assertEquals(1, reader.numRows());
+
+ // Schema should have ONLY 3 user fields (no meta fields)
+ assertEquals(3, reader.schema().getFields().size(), "Should have only
user fields");
+
+ // Read and verify data
+ ArrowReader arrowReader = reader.readAll(null, null, Integer.MAX_VALUE);
+ assertTrue(arrowReader.loadNextBatch());
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Verify NO meta fields exist
+ assertFalse(hasField(root, COMMIT_TIME_METADATA_FIELD.getFieldName()),
"Should not have commit time field");
+ assertFalse(hasField(root, RECORD_KEY_METADATA_FIELD.getFieldName()),
"Should not have record key field");
+
+ // Verify user data
+ IntVector idVector = (IntVector) root.getVector("id");
+ assertEquals(1, idVector.get(0));
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertEquals("Bob", new String(nameVector.get(0)));
+
+ arrowReader.close();
+ }
+ }
+
+ @Test
+ public void testWriteRowSimple() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_simple_write.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ InternalRow row = createRow(1, "Charlie", 35L);
+ writer.writeRow("key3", row);
+ writer.close();
+
+ // Verify file exists and has correct record count
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(1, reader.numRows());
+ }
+ }
+
+ @Test
+ public void testSequenceIdGeneration() throws Exception {
+ StructType schema = createSchemaWithMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_sequence_id.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, true);
+
+ // Write multiple records
+ for (int i = 0; i < 3; i++) {
+ InternalRow row = createRowWithMetaFields(i, "User" + i, 20L + i);
+ HoodieKey key = new HoodieKey("key" + i, "partition1");
+ writer.writeRowWithMetadata(key, row);
+ }
+ writer.close();
+
+ // Verify sequence IDs are unique and properly formatted
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+
+ assertEquals(3, reader.numRows());
+
+ ArrowReader arrowReader = reader.readAll(null, null, Integer.MAX_VALUE);
+ arrowReader.loadNextBatch();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ VarCharVector seqNoVector = (VarCharVector)
root.getVector(COMMIT_SEQNO_METADATA_FIELD.getFieldName());
+
+ List<String> seqNos = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String seqNo = new String(seqNoVector.get(i));
+ assertTrue(seqNo.startsWith(instantTime + "_"), "Sequence ID should
start with instantTime");
+ seqNos.add(seqNo);
+ }
+
+ // All sequence IDs should be unique
+ assertEquals(3, seqNos.stream().distinct().count(), "All sequence IDs
should be unique");
+
+ arrowReader.close();
+ }
+ }
+
+ @Test
+ public void testBatchFlushing() throws Exception {
+ StructType schema = createSchemaWithoutMetaFields();
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_batch_flush.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ // Write more than DEFAULT_BATCH_SIZE (1000) records
+ int recordCount = 2500;
+ for (int i = 0; i < recordCount; i++) {
+ InternalRow row = createRow(i, "User" + i, 20L + i);
+ writer.writeRow("key" + i, row);
+ }
+ writer.close();
+
+ // Verify all records were written
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(recordCount, reader.numRows(), "All records should be
written");
+ }
+ }
+
+ @Test
+ public void testPrimitiveTypes() throws Exception {
+ StructType schema = new StructType()
+ .add("int_field", DataTypes.IntegerType, false)
+ .add("long_field", DataTypes.LongType, false)
+ .add("float_field", DataTypes.FloatType, false)
+ .add("double_field", DataTypes.DoubleType, false)
+ .add("bool_field", DataTypes.BooleanType, false)
+ .add("string_field", DataTypes.StringType, false)
+ .add("binary_field", DataTypes.BinaryType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_primitives.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ GenericInternalRow row = new GenericInternalRow(new Object[]{
+ 42, // int
+ 123456789L, // long
+ 3.14f, // float
+ 2.71828, // double
+ true, // boolean
+ UTF8String.fromString("test"), // string
+ new byte[]{1, 2, 3, 4} // binary
+ });
+
+ writer.writeRow("key1", row);
+ writer.close();
+
+ // Verify all types were written correctly
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+
+ assertEquals(1, reader.numRows());
+ assertEquals(7, reader.schema().getFields().size());
+
+ ArrowReader arrowReader = reader.readAll(null, null, Integer.MAX_VALUE);
+ arrowReader.loadNextBatch();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ assertEquals(42, ((IntVector) root.getVector("int_field")).get(0));
+ assertEquals(123456789L, ((BigIntVector)
root.getVector("long_field")).get(0));
+ assertEquals(3.14f, ((Float4Vector)
root.getVector("float_field")).get(0), 0.001);
+ assertEquals(2.71828, ((Float8Vector)
root.getVector("double_field")).get(0), 0.00001);
+ assertEquals(1, ((BitVector) root.getVector("bool_field")).get(0));
+ assertEquals("test", new String(((VarCharVector)
root.getVector("string_field")).get(0)));
+
+ byte[] binary = ((VarBinaryVector)
root.getVector("binary_field")).get(0);
+ assertEquals(4, binary.length);
+
+ arrowReader.close();
+ }
+ }
+
+ @Test
+ public void testNullValues() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_nulls.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ // Write rows with null values
+ writer.writeRow("key1", createRow(1, "Alice", 30L));
+ writer.writeRow("key2", createRow(2, null, 25L)); // null name
+ writer.writeRow("key3", createRow(3, "Charlie", null)); // null age
+ writer.close();
+
+ // Verify nulls are preserved
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+
+ assertEquals(3, reader.numRows());
+
+ ArrowReader arrowReader = reader.readAll(null, null, Integer.MAX_VALUE);
+ arrowReader.loadNextBatch();
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ VarCharVector nameVector = (VarCharVector) root.getVector("name");
+ assertFalse(nameVector.isNull(0), "First name should not be null");
+ assertTrue(nameVector.isNull(1), "Second name should be null");
+ assertFalse(nameVector.isNull(2), "Third name should not be null");
+
+ BigIntVector ageVector = (BigIntVector) root.getVector("age");
+ assertFalse(ageVector.isNull(0), "First age should not be null");
+ assertFalse(ageVector.isNull(1), "Second age should not be null");
+ assertTrue(ageVector.isNull(2), "Third age should be null");
+
+ arrowReader.close();
+ }
+ }
+
+ @Test
+ public void testWriteEmptyDataset() throws Exception {
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false);
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_empty.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+ writer.close(); // Close without writing any rows
+
+ // Lance doesn't create a file if no data is written
+ assertFalse(storage.exists(path), "Lance file should not exist when no
data is written");
+ }
+
+ @Test
+ public void testWriteStructType() throws Exception {
+ // Create schema with nested struct
+ StructType addressSchema = new StructType()
+ .add("street", DataTypes.StringType, true)
+ .add("city", DataTypes.StringType, true)
+ .add("zipcode", DataTypes.IntegerType, true);
+
+ StructType schema = new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("address", addressSchema, true);
+
+ // Create test data with nested struct
+ GenericInternalRow address1 = new GenericInternalRow(new Object[]{
+ UTF8String.fromString("123 Main St"),
+ UTF8String.fromString("New York"),
+ 10001
+ });
+
+ GenericInternalRow address2 = new GenericInternalRow(new Object[]{
+ UTF8String.fromString("456 Oak Ave"),
+ UTF8String.fromString("Los Angeles"),
+ 90001
+ });
+
+ List<InternalRow> rows = new ArrayList<>();
+ rows.add(new GenericInternalRow(new Object[]{1,
UTF8String.fromString("Alice"), address1}));
+ rows.add(new GenericInternalRow(new Object[]{2,
UTF8String.fromString("Bob"), address2}));
+
+ StoragePath path = new StoragePath(tempDir.getAbsolutePath() +
"/test_struct.lance");
+ HoodieSparkLanceWriter writer = new HoodieSparkLanceWriter(
+ path, schema, instantTime, taskContextSupplier, storage, false);
+
+ for (InternalRow row : rows) {
+ writer.writeRow("key" + rows.indexOf(row), row);
+ }
+ writer.close();
+
+ assertTrue(storage.exists(path), "Lance file with struct type should
exist");
+ try (BufferAllocator allocator = new RootAllocator();
+ LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
+ assertEquals(rows.size(), reader.numRows(), "Row count should match");
+ assertEquals(3, reader.schema().getFields().size(), "Should have 3
top-level fields (id, name, address)");
+ }
+ }
+
+ // Helper methods
+
+ private StructType createSchemaWithMetaFields() {
+ return new StructType()
+ .add(COMMIT_TIME_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(COMMIT_SEQNO_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(RECORD_KEY_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add(PARTITION_PATH_METADATA_FIELD.getFieldName(),
DataTypes.StringType, false)
+ .add(FILENAME_METADATA_FIELD.getFieldName(), DataTypes.StringType,
false)
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+ }
+
+ private StructType createSchemaWithoutMetaFields() {
+ return new StructType()
+ .add("id", DataTypes.IntegerType, false)
+ .add("name", DataTypes.StringType, true)
+ .add("age", DataTypes.LongType, true);
+ }
+
+ private InternalRow createRowWithMetaFields(Object... userValues) {
+ // Create row with PLACEHOLDER meta fields (will be updated by writer) +
user data
+ Object[] allValues = new Object[5 + userValues.length];
+
+ // Meta fields - use empty strings as placeholders
+ allValues[0] = UTF8String.fromString(""); // commit_time
+ allValues[1] = UTF8String.fromString(""); // commit_seqno
+ allValues[2] = UTF8String.fromString(""); // record_key
+ allValues[3] = UTF8String.fromString(""); // partition_path
+ allValues[4] = UTF8String.fromString(""); // file_name
+
+ // Copy user values
+ for (int i = 0; i < userValues.length; i++) {
+ allValues[5 + i] = processValue(userValues[i]);
+ }
+
+ return new GenericInternalRow(allValues);
+ }
+
+ private InternalRow createRow(Object... values) {
+ Object[] processedValues = new Object[values.length];
+ for (int i = 0; i < values.length; i++) {
+ processedValues[i] = processValue(values[i]);
+ }
+ return new GenericInternalRow(processedValues);
+ }
+
+ private Object processValue(Object value) {
+ if (value instanceof String) {
+ return UTF8String.fromString((String) value);
+ }
+ return value;
+ }
+
+ private boolean hasField(VectorSchemaRoot root, String fieldName) {
+ try {
+ return root.getVector(fieldName) != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/packaging/bundle-validation/docker_java17/docker_java17_test.sh
b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
index 37aa745f3cd9..6fb4fbfb2167 100755
--- a/packaging/bundle-validation/docker_java17/docker_java17_test.sh
+++ b/packaging/bundle-validation/docker_java17/docker_java17_test.sh
@@ -107,14 +107,11 @@ stop_hdfs() {
}
build_hudi () {
- if [ "$SPARK_PROFILE" = "spark4.0" ]; then
- change_java_runtime_version
- else
- use_default_java_runtime
- fi
+ # Always use >= Java 11 for Arrow compatibility
+ change_java_runtime_version
mvn clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true \
- -e -ntp -B -V -Dgpg.skip -Djacoco.skip -Pwarn-log \
+ -e -ntp -B -V -Dgpg.skip -Djacoco.skip -Pwarn-log -Djava.version=11 \
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.shade=warn \
-Dorg.slf4j.simpleLogger.log.org.apache.maven.plugins.dependency=warn \
-pl packaging/hudi-spark-bundle -am
diff --git a/pom.xml b/pom.xml
index 06102885ec7e..dc4b89bb21b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -238,6 +238,11 @@
<springboot.version>2.7.3</springboot.version>
<spring.shell.version>2.1.1</spring.shell.version>
<snappy.version>1.1.10.7</snappy.version>
+ <arrow.version>18.3.0</arrow.version>
+ <lance.version>0.38.0</lance.version>
+ <lance.spark.connector.version>0.0.13</lance.spark.connector.version>
+
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- The following properties are only used for Jacoco coverage report
aggregation -->
<copy.files>false</copy.files>
<copy.files.target.dir>${maven.multiModuleProjectDirectory}</copy.files.target.dir>
@@ -897,7 +902,29 @@
<version>${orc.spark.version}</version>
<scope>compile</scope>
</dependency>
-
+ <!-- Lance Core SDK -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>lance-core</artifactId>
+ <version>${lance.version}</version>
+ </dependency>
+ <!-- Lance Spark for Arrow conversion -->
+ <dependency>
+ <groupId>com.lancedb</groupId>
+ <artifactId>${lance.spark.artifact}</artifactId>
+ <version>${lance.spark.connector.version}</version>
+ </dependency>
+ <!-- Apache Arrow -->
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
<!-- RoaringBitmap -->
<dependency>
<groupId>org.roaringbitmap</groupId>
@@ -2432,6 +2459,9 @@
<!-- This glob has to include hudi-spark3-common,
hudi-spark3.2plus-common -->
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<kafka.version>2.8.1</kafka.version>
+ <!-- Lance: Skip tests for Spark 3.3 due to lack of support -->
+
<lance.spark.artifact>lance-spark-base_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>true</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2473,6 +2503,9 @@
<hudi.spark.common.module>hudi-spark3-common</hudi.spark.common.module>
<scalatest.version>${scalatest.spark3.version}</scalatest.version>
<kafka.version>3.3.2</kafka.version>
+ <!-- Lance: Use Spark 3.4-specific artifact -->
+
<lance.spark.artifact>lance-spark-3.4_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2524,6 +2557,9 @@
<scalatest.version>${scalatest.spark3.version}</scalatest.version>
<kafka.version>3.4.1</kafka.version>
<hive.storage.version>2.8.1</hive.storage.version>
+ <!-- Lance: Use Spark 3.5-specific artifact -->
+
<lance.spark.artifact>lance-spark-3.5_${scala.binary.version}</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are
@@ -2583,6 +2619,9 @@
<hadoop.version>3.4.0</hadoop.version>
<kafka.version>3.8.0</kafka.version>
<hive.storage.version>2.8.1</hive.storage.version>
+ <!-- Lance: Use Spark 4.0-specific artifact (Scala 2.13 only) -->
+ <lance.spark.artifact>lance-spark-4.0_2.13</lance.spark.artifact>
+ <lance.skip.tests>false</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are