This is an automated email from the ASF dual-hosted git repository.

kunwp1 pushed a commit to branch chris-big-object-version-0
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 8c68c05baca59d9ea5f83438d9ddacf697c88816
Author: Kunwoo Park <[email protected]>
AuthorDate: Tue Oct 14 13:48:08 2025 -0700

    Big Object for Java Native Operators
---
 .../engine/architecture/rpc/controlcommands.proto  |   1 +
 .../scheduling/RegionExecutionCoordinator.scala    |   3 +-
 .../InitializeExecutorHandler.scala                |   4 +
 .../dashboard/user/workflow/WorkflowResource.scala |   6 +
 .../texera/web/service/WorkflowService.scala       |   5 +
 .../engine/architecture/worker/WorkerSpec.scala    |   3 +-
 common/workflow-core/build.sbt                     |   5 +-
 .../amber/core/executor/ExecutionContext.scala     |  52 ++++++
 .../org/apache/amber/core/tuple/AttributeType.java |   3 +
 .../amber/core/tuple/AttributeTypeUtils.scala      |  21 ++-
 .../apache/amber/core/tuple/BigObjectPointer.java  |  79 ++++++++
 .../scala/org/apache/amber/util/IcebergUtil.scala  |  93 ++++++++--
 .../texera/service/util/BigObjectManager.scala     | 199 +++++++++++++++++++++
 .../texera/service/util/S3StorageClient.scala      | 131 ++++++++++++++
 .../operator/source/scan/FileAttributeType.java    |   5 +-
 .../source/scan/FileScanSourceOpExec.scala         |  11 +-
 file-service/build.sbt                             |   3 -
 sql/texera_ddl.sql                                 |   7 +
 sql/updates/big_object.sql                         |  33 ++++
 19 files changed, 630 insertions(+), 34 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
index 41f0976314..d596f8b044 100644
--- 
a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto
@@ -256,6 +256,7 @@ message InitializeExecutorRequest {
   int32 totalWorkerCount = 1;
   core.OpExecInitInfo opExecInitInfo = 2;
   bool isSource = 3;
+  int64 executionId = 4;
 }
 
 message UpdateExecutorRequest {
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index bacc345a2c..588054fd0f 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -359,7 +359,8 @@ class RegionExecutionCoordinator(
                 InitializeExecutorRequest(
                   workerConfigs.length,
                   physicalOp.opExecInitInfo,
-                  physicalOp.isSourceOperator
+                  physicalOp.isSourceOperator,
+                  physicalOp.executionId.id
                 ),
                 asyncRPCClient.mkContext(workerId)
               )
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
index 32a718606c..b1d6b2f609 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala
@@ -42,6 +42,10 @@ trait InitializeExecutorHandler {
     dp.serializationManager.setOpInitialization(req)
     val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId)
     val workerCount = req.totalWorkerCount
+
+    // Set execution context for this thread
+    ExecutionContext.setExecutionId(req.executionId)
+
     dp.executor = req.opExecInitInfo match {
       case OpExecWithClassName(className, descString) =>
         ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, 
workerCount)
diff --git 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
index 01ae898a66..b132fc1d18 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala
@@ -36,6 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{
   WorkflowUserAccessDao
 }
 import org.apache.texera.dao.jooq.generated.tables.pojos._
+import org.apache.texera.service.util.BigObjectManager
 import org.apache.texera.web.resource.dashboard.hub.EntityType
 import 
org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess
@@ -600,6 +601,11 @@ class WorkflowResource extends LazyLogging {
         .asScala
         .toList
 
+      // Delete big objects
+      eids.foreach { eid =>
+        BigObjectManager.delete(eid.toInt)
+      }
+
       // Collect all URIs related to executions for cleanup
       val uris = eids.flatMap { eid =>
         val executionId = ExecutionIdentity(eid.longValue())
diff --git 
a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala 
b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
index 01c66fb458..35dd9fde27 100644
--- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
+++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala
@@ -46,6 +46,7 @@ import 
org.apache.amber.engine.architecture.worker.WorkflowWorker.{
 }
 import org.apache.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, 
getStackTraceWithAllCauses}
 import org.apache.texera.dao.jooq.generated.tables.pojos.User
+import org.apache.texera.service.util.BigObjectManager
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
 import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
@@ -307,6 +308,7 @@ class WorkflowService(
     *  2. Clears URI references from the execution registry
     *  3. Safely clears all result and console message documents
     *  4. Expires Iceberg snapshots for runtime statistics
+    *  5. Deletes big objects from MinIO
     *
     * @param eid The execution identity to clean up resources for
     */
@@ -343,6 +345,9 @@ class WorkflowService(
           logger.debug(s"Error processing document at $uri: 
${error.getMessage}")
       }
     }
+
+    // Delete big objects
+    BigObjectManager.delete(eid.id.toInt)
   }
 
 }
diff --git 
a/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala
 
b/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala
index cc874f1c8a..ef54e706af 100644
--- 
a/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala
+++ 
b/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala
@@ -194,7 +194,8 @@ class WorkerSpec
       InitializeExecutorRequest(
         1,
         
OpExecWithClassName("org.apache.amber.engine.architecture.worker.DummyOperatorExecutor"),
-        isSource = false
+        isSource = false,
+        1
       ),
       AsyncRPCContext(CONTROLLER, identifier1),
       4
diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt
index c62fad7b4d..122c267271 100644
--- a/common/workflow-core/build.sbt
+++ b/common/workflow-core/build.sbt
@@ -182,5 +182,8 @@ libraryDependencies ++= Seq(
   "org.apache.commons" % "commons-vfs2" % "2.9.0",                     // for 
FileResolver throw VFS-related exceptions
   "io.lakefs" % "sdk" % "1.51.0",                                     // for 
lakeFS api calls
   "com.typesafe" % "config" % "1.4.3",                                 // 
config reader
-  "org.apache.commons" % "commons-jcs3-core" % "3.2"                  // 
Apache Commons JCS
+  "org.apache.commons" % "commons-jcs3-core" % "3.2",                 // 
Apache Commons JCS
+  "software.amazon.awssdk" % "s3" % "2.29.51",
+  "software.amazon.awssdk" % "auth" % "2.29.51",
+  "software.amazon.awssdk" % "regions" % "2.29.51",
 )
\ No newline at end of file
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
new file mode 100644
index 0000000000..af33e84d2a
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.core.executor
+
+/**
+  * ExecutionContext provides thread-local access to execution metadata.
+  * This allows operator executors to access execution ID.
+  */
+object ExecutionContext {
+  private val executionIdThreadLocal: ThreadLocal[Option[Int]] = 
ThreadLocal.withInitial(() => None)
+
+  /**
+    * Sets the execution ID for the current thread.
+    * Should be called when initializing an executor.
+    */
+  def setExecutionId(executionId: Long): Unit = {
+    executionIdThreadLocal.set(Some(executionId.toInt))
+  }
+
+  /**
+    * Gets the execution ID for the current thread.
+    * @return Some(executionId) if set, None otherwise
+    */
+  def getExecutionId: Option[Int] = {
+    executionIdThreadLocal.get()
+  }
+
+  /**
+    * Clears the execution ID for the current thread.
+    * Should be called when cleaning up.
+    */
+  def clear(): Unit = {
+    executionIdThreadLocal.remove()
+  }
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
index 73314fba0b..8791e92ff3 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java
@@ -70,6 +70,7 @@ public enum AttributeType implements Serializable {
     BOOLEAN("boolean", Boolean.class),
     TIMESTAMP("timestamp", Timestamp.class),
     BINARY("binary", byte[].class),
+    BIG_OBJECT("big_object", BigObjectPointer.class),
     ANY("ANY", Object.class);
 
     private final String name;
@@ -109,6 +110,8 @@ public enum AttributeType implements Serializable {
             return TIMESTAMP;
         } else if (fieldClass.equals(byte[].class)) {
             return BINARY;
+        } else if (fieldClass.equals(BigObjectPointer.class)) {
+            return BIG_OBJECT;
         } else {
             return ANY;
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
index e4fdcb4611..8184560204 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala
@@ -121,14 +121,15 @@ object AttributeTypeUtils extends Serializable {
   ): Any = {
     if (field == null) return null
     attributeType match {
-      case AttributeType.INTEGER   => parseInteger(field, force)
-      case AttributeType.LONG      => parseLong(field, force)
-      case AttributeType.DOUBLE    => parseDouble(field)
-      case AttributeType.BOOLEAN   => parseBoolean(field)
-      case AttributeType.TIMESTAMP => parseTimestamp(field)
-      case AttributeType.STRING    => field.toString
-      case AttributeType.BINARY    => field
-      case AttributeType.ANY | _   => field
+      case AttributeType.INTEGER    => parseInteger(field, force)
+      case AttributeType.LONG       => parseLong(field, force)
+      case AttributeType.DOUBLE     => parseDouble(field)
+      case AttributeType.BOOLEAN    => parseBoolean(field)
+      case AttributeType.TIMESTAMP  => parseTimestamp(field)
+      case AttributeType.STRING     => field.toString
+      case AttributeType.BINARY     => field
+      case AttributeType.BIG_OBJECT => field // Big objects are created 
programmatically, not parsed
+      case AttributeType.ANY | _    => field
     }
   }
 
@@ -383,7 +384,9 @@ object AttributeTypeUtils extends Serializable {
       case AttributeType.INTEGER   => tryParseInteger(fieldValue)
       case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue)
       case AttributeType.BINARY    => tryParseString()
-      case _                       => tryParseString()
+      case AttributeType.BIG_OBJECT =>
+        AttributeType.BIG_OBJECT // Big objects are never inferred from data
+      case _ => tryParseString()
     }
   }
 
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java
new file mode 100644
index 0000000000..bfcfc840e7
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.core.tuple;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Objects;
+
+/**
+ * BigObjectPointer represents a pointer to a large object stored in S3.
+ * The pointer is formatted as a URI: s3://bucket/path/to/object
+ */
+public class BigObjectPointer implements Serializable {
+    
+    private final String uri;
+    private final transient URI parsedUri;
+    
+    @JsonCreator
+    public BigObjectPointer(@JsonProperty("uri") String uri) {
+        if (uri == null || !uri.startsWith("s3://")) {
+            throw new IllegalArgumentException("BigObjectPointer URI must 
start with 's3://' but was: " + uri);
+        }
+        this.uri = uri;
+        this.parsedUri = URI.create(uri);
+    }
+    
+    @JsonValue
+    public String getUri() {
+        return uri;
+    }
+    
+    public String getBucketName() {
+        return parsedUri.getHost();
+    }
+    
+    public String getObjectKey() {
+        String path = parsedUri.getPath();
+        return path.startsWith("/") ? path.substring(1) : path;
+    }
+    
+    @Override
+    public String toString() {
+        return uri;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (!(obj instanceof BigObjectPointer)) return false;
+        BigObjectPointer that = (BigObjectPointer) obj;
+        return Objects.equals(uri, that.uri);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(uri);
+    }
+}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala 
b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
index bc17139641..c5c73660fa 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala
@@ -20,7 +20,7 @@
 package org.apache.amber.util
 
 import org.apache.amber.config.StorageConfig
-import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
+import org.apache.amber.core.tuple.{Attribute, AttributeType, 
BigObjectPointer, Schema, Tuple}
 import org.apache.hadoop.conf.Configuration
 import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
 import org.apache.iceberg.data.parquet.GenericParquetReaders
@@ -52,6 +52,9 @@ import scala.jdk.CollectionConverters._
   */
 object IcebergUtil {
 
+  // Unique suffix for BIG_OBJECT field encoding
+  private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr"
+
   /**
     * Creates and initializes a HadoopCatalog with the given parameters.
     * - Uses an empty Hadoop `Configuration`, meaning the local file system 
(or `file:/`) will be used by default
@@ -207,7 +210,9 @@ object IcebergUtil {
   def toIcebergSchema(amberSchema: Schema): IcebergSchema = {
     val icebergFields = amberSchema.getAttributes.zipWithIndex.map {
       case (attribute, index) =>
-        Types.NestedField.optional(index + 1, attribute.getName, 
toIcebergType(attribute.getType))
+        // Encode field name for BIG_OBJECT types to ensure they can be 
detected when reading back
+        val encodedName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
+        Types.NestedField.optional(index + 1, encodedName, 
toIcebergType(attribute.getType))
     }
     new IcebergSchema(icebergFields.asJava)
   }
@@ -227,6 +232,8 @@ object IcebergUtil {
       case AttributeType.BOOLEAN   => Types.BooleanType.get()
       case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone()
       case AttributeType.BINARY    => Types.BinaryType.get()
+      case AttributeType.BIG_OBJECT =>
+        Types.StringType.get() // Store BigObjectPointer URI as string
       case AttributeType.ANY =>
         throw new IllegalArgumentException("ANY type is not supported in 
Iceberg")
     }
@@ -244,12 +251,14 @@ object IcebergUtil {
     tuple.schema.getAttributes.zipWithIndex.foreach {
       case (attribute, index) =>
         val value = tuple.getField[AnyRef](index) match {
-          case null               => null
-          case ts: Timestamp      => 
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
-          case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
-          case other              => other
+          case null                        => null
+          case ts: Timestamp               => 
ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime
+          case bytes: Array[Byte]          => ByteBuffer.wrap(bytes)
+          case bigObjPtr: BigObjectPointer => bigObjPtr.getUri
+          case other                       => other
         }
-        record.setField(attribute.getName, value)
+        val fieldName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
+        record.setField(fieldName, value)
     }
 
     record
@@ -264,21 +273,54 @@ object IcebergUtil {
     */
   def fromRecord(record: Record, amberSchema: Schema): Tuple = {
     val fieldValues = amberSchema.getAttributes.map { attribute =>
-      val value = record.getField(attribute.getName) match {
-        case null               => null
-        case ldt: LocalDateTime => Timestamp.valueOf(ldt)
-        case buffer: ByteBuffer =>
+      val fieldName = encodeBigObjectFieldName(attribute.getName, 
attribute.getType)
+      val rawValue = record.getField(fieldName)
+
+      (rawValue, attribute.getType) match {
+        case (null, _)               => null
+        case (ldt: LocalDateTime, _) => Timestamp.valueOf(ldt)
+        case (buffer: ByteBuffer, _) =>
           val bytes = new Array[Byte](buffer.remaining())
           buffer.get(bytes)
           bytes
-        case other => other
+        case (uri: String, AttributeType.BIG_OBJECT) => new 
BigObjectPointer(uri)
+        case (other, _)                              => other
       }
-      value
     }
 
     Tuple(amberSchema, fieldValues.toArray)
   }
 
+  /**
+    * Encodes a field name for BIG_OBJECT types by adding a unique system 
suffix.
+    * This ensures BIG_OBJECT fields can be identified when reading from 
Iceberg.
+    *
+    * @param fieldName The original field name
+    * @param attributeType The attribute type
+    * @return The encoded field name with a unique suffix for BIG_OBJECT types
+    */
+  def encodeBigObjectFieldName(fieldName: String, attributeType: 
AttributeType): String = {
+    attributeType match {
+      case AttributeType.BIG_OBJECT => 
s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}"
+      case _                        => fieldName
+    }
+  }
+
+  /**
+    * Decodes a field name by removing the unique system suffix if present.
+    * This restores the original user-defined field name.
+    *
+    * @param fieldName The encoded field name
+    * @return The original field name with system suffix removed
+    */
+  def decodeBigObjectFieldName(fieldName: String): String = {
+    if (fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)) {
+      fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length)
+    } else {
+      fieldName
+    }
+  }
+
   /**
     * Converts an Iceberg `Schema` to an Amber `Schema`.
     *
@@ -290,7 +332,16 @@ object IcebergUtil {
       .columns()
       .asScala
       .map { field =>
-        new Attribute(field.name(), 
fromIcebergType(field.`type`().asPrimitiveType()))
+        // Determine the attribute type using field name hints
+        val attributeType = fromIcebergType(
+          field.`type`().asPrimitiveType(),
+          field.name()
+        )
+
+        // Decode field name to restore original name for BIG_OBJECT types
+        val originalName = decodeBigObjectFieldName(field.name())
+
+        new Attribute(originalName, attributeType)
       }
       .toList
 
@@ -301,11 +352,21 @@ object IcebergUtil {
     * Converts an Iceberg `Type` to an Amber `AttributeType`.
     *
     * @param icebergType The Iceberg Type.
+    * @param fieldName The field name (used to detect BIG_OBJECT by suffix).
     * @return The corresponding Amber AttributeType.
     */
-  def fromIcebergType(icebergType: PrimitiveType): AttributeType = {
+  def fromIcebergType(
+      icebergType: PrimitiveType,
+      fieldName: String = ""
+  ): AttributeType = {
     icebergType match {
-      case _: Types.StringType    => AttributeType.STRING
+      case _: Types.StringType =>
+        // Check for BIG_OBJECT by examining the unique suffix pattern
+        if (fieldName.nonEmpty && fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)) 
{
+          AttributeType.BIG_OBJECT
+        } else {
+          AttributeType.STRING
+        }
       case _: Types.IntegerType   => AttributeType.INTEGER
       case _: Types.LongType      => AttributeType.LONG
       case _: Types.DoubleType    => AttributeType.DOUBLE
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
new file mode 100644
index 0000000000..19586372cc
--- /dev/null
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.service.util
+
+import com.typesafe.scalalogging.LazyLogging
+import org.apache.amber.core.tuple.BigObjectPointer
+import org.apache.texera.dao.SqlServer
+import org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT
+
+import java.io.{Closeable, InputStream}
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import scala.jdk.CollectionConverters._
+
+/**
+  * BigObjectStream wraps an InputStream and tracks its lifecycle for proper 
cleanup.
+  */
+class BigObjectStream(private val inputStream: InputStream, private val 
pointer: BigObjectPointer)
+    extends InputStream
+    with Closeable {
+
+  @volatile private var closed = false
+
+  private def checkClosed(): Unit = {
+    if (closed) throw new IllegalStateException("Stream is closed")
+  }
+
+  override def read(): Int = {
+    checkClosed()
+    inputStream.read()
+  }
+
+  override def read(b: Array[Byte]): Int = {
+    checkClosed()
+    inputStream.read(b)
+  }
+
+  override def read(b: Array[Byte], off: Int, len: Int): Int = {
+    checkClosed()
+    inputStream.read(b, off, len)
+  }
+
+  override def available(): Int = if (closed) 0 else inputStream.available()
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      inputStream.close()
+      BigObjectManager.closeStream(pointer)
+    }
+  }
+
+  def isClosed: Boolean = closed
+  def getPointer: BigObjectPointer = pointer
+}
+
+/**
+  * BigObjectManager manages the lifecycle of large objects (>2GB) stored in 
S3.
+  */
+object BigObjectManager extends LazyLogging {
+  private val DEFAULT_BUCKET = "texera-big-objects"
+  private val openStreams = new ConcurrentHashMap[BigObjectPointer, 
BigObjectStream]()
+  private lazy val context = SqlServer.getInstance().createDSLContext()
+
+  /**
+    * Creates a big object from an InputStream using multipart upload.
+    * Handles streams of any size without loading into memory.
+    * Registers the big object in the database for cleanup tracking.
+    *
+    * @param executionId The execution ID this big object belongs to.
+    * @param stream The input stream containing the big object data.
+    * @return A BigObjectPointer that can be used in tuples.
+    */
+  def create(executionId: Int, stream: InputStream): BigObjectPointer = {
+    S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET)
+
+    val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}"
+    val uri = s"s3://$DEFAULT_BUCKET/$objectKey"
+
+    // Upload to S3
+    S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream)
+
+    // Register in database
+    try {
+      context
+        .insertInto(BIG_OBJECT)
+        .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.URI)
+        .values(Int.box(executionId), uri)
+        .execute()
+      logger.debug(s"Registered big object: eid=$executionId, uri=$uri")
+    } catch {
+      case e: Exception =>
+        // Database failed - clean up S3 object
+        logger.error(s"Failed to register big object, cleaning up: $uri", e)
+        try {
+          S3StorageClient.deleteObject(DEFAULT_BUCKET, objectKey)
+        } catch {
+          case cleanupError: Exception =>
+            logger.error(s"Failed to cleanup orphaned S3 object: $uri", 
cleanupError)
+        }
+        throw new RuntimeException(s"Failed to create big object: 
${e.getMessage}", e)
+    }
+
+    new BigObjectPointer(uri)
+  }
+
+  /**
+    * Opens a big object for reading.
+    *
+    * @param ptr The BigObjectPointer from a tuple field.
+    * @return A BigObjectStream for reading the big object data.
+    */
+  def open(ptr: BigObjectPointer): BigObjectStream = {
+    require(
+      S3StorageClient.objectExists(ptr.getBucketName, ptr.getObjectKey),
+      s"Big object does not exist: ${ptr.getUri}"
+    )
+
+    val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, 
ptr.getObjectKey)
+    val stream = new BigObjectStream(inputStream, ptr)
+    openStreams.put(ptr, stream)
+    stream
+  }
+
+  /**
+    * Deletes all big objects associated with a specific execution ID.
+    * This is called during workflow execution cleanup.
+    *
+    * @param executionId The execution ID whose big objects should be deleted.
+    * @return The number of big objects successfully deleted from S3.
+    */
+  def delete(executionId: Int): Unit = {
+    val uris = context
+      .select(BIG_OBJECT.URI)
+      .from(BIG_OBJECT)
+      .where(BIG_OBJECT.EXECUTION_ID.eq(executionId))
+      .fetchInto(classOf[String])
+      .asScala
+      .toList
+
+    if (uris.isEmpty) {
+      logger.debug(s"No big objects for execution $executionId")
+      return
+    }
+
+    logger.info(s"Deleting ${uris.size} big object(s) for execution 
$executionId")
+
+    // Delete each object from S3
+    uris.foreach { uri =>
+      try {
+        val ptr = new BigObjectPointer(uri)
+        Option(openStreams.get(ptr)).foreach(_.close())
+        S3StorageClient.deleteObject(ptr.getBucketName, ptr.getObjectKey)
+      } catch {
+        case e: Exception =>
+          logger.error(s"Failed to delete big object: $uri", e)
+      }
+    }
+
+    // Delete database records
+    context
+      .deleteFrom(BIG_OBJECT)
+      .where(BIG_OBJECT.EXECUTION_ID.eq(executionId))
+      .execute()
+  }
+
+  /**
+    * Closes a big object stream. Typically called automatically when the 
stream is closed.
+    */
+  def close(ptr: BigObjectPointer): Unit = {
+    Option(openStreams.get(ptr)).foreach { stream =>
+      if (!stream.isClosed) stream.close()
+    }
+  }
+
+  /**
+    * Internal method to remove a stream from tracking when it's closed.
+    */
+  private[util] def closeStream(ptr: BigObjectPointer): Unit = {
+    openStreams.remove(ptr)
+  }
+}
diff --git 
a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
similarity index 58%
rename from 
file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
rename to 
common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
index 7c157cc0ae..a281f6f6c2 100644
--- 
a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala
@@ -24,7 +24,9 @@ import 
software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede
 import software.amazon.awssdk.regions.Region
 import software.amazon.awssdk.services.s3.model._
 import software.amazon.awssdk.services.s3.{S3Client, S3Configuration}
+import software.amazon.awssdk.core.sync.RequestBody
 
+import java.io.InputStream
 import java.security.MessageDigest
 import scala.jdk.CollectionConverters._
 
@@ -139,4 +141,133 @@ object S3StorageClient {
       s3Client.deleteObjects(deleteObjectsRequest)
     }
   }
+
+  /**
+    * Uploads an object to S3 using multipart upload.
+    * Handles streams of any size without loading into memory.
+    */
+  def uploadObject(bucketName: String, objectKey: String, inputStream: 
InputStream): String = {
+    val buffer = new Array[Byte](MINIMUM_NUM_OF_MULTIPART_S3_PART.toInt)
+    val uploadId = s3Client
+      .createMultipartUpload(
+        
CreateMultipartUploadRequest.builder().bucket(bucketName).key(objectKey).build()
+      )
+      .uploadId()
+
+    try {
+      val parts = Iterator
+        .from(1)
+        .map { partNumber =>
+          // Fill buffer completely
+          var offset = 0
+          var read = 0
+          while (
+            offset < buffer.length && {
+              read = inputStream.read(buffer, offset, buffer.length - offset); 
read > 0
+            }
+          ) {
+            offset += read
+          }
+
+          if (offset > 0) {
+            val eTag = s3Client
+              .uploadPart(
+                UploadPartRequest
+                  .builder()
+                  .bucket(bucketName)
+                  .key(objectKey)
+                  .uploadId(uploadId)
+                  .partNumber(partNumber)
+                  .build(),
+                RequestBody.fromBytes(buffer.take(offset))
+              )
+              .eTag()
+            
Some(CompletedPart.builder().partNumber(partNumber).eTag(eTag).build())
+          } else None
+        }
+        .takeWhile(_.isDefined)
+        .flatten
+        .toList
+
+      s3Client
+        .completeMultipartUpload(
+          CompleteMultipartUploadRequest
+            .builder()
+            .bucket(bucketName)
+            .key(objectKey)
+            .uploadId(uploadId)
+            
.multipartUpload(CompletedMultipartUpload.builder().parts(parts.asJava).build())
+            .build()
+        )
+        .eTag()
+
+    } catch {
+      case e: Exception =>
+        try {
+          s3Client.abortMultipartUpload(
+            AbortMultipartUploadRequest
+              .builder()
+              .bucket(bucketName)
+              .key(objectKey)
+              .uploadId(uploadId)
+              .build()
+          )
+        } catch { case _: Exception => }
+        throw e
+    }
+  }
+
+  /**
+    * Downloads an object from S3 as an InputStream.
+    *
+    * @param bucketName The S3 bucket name.
+    * @param objectKey The object key (path) in S3.
+    * @return An InputStream containing the object data.
+    */
+  def downloadObject(bucketName: String, objectKey: String): InputStream = {
+    val getObjectRequest = GetObjectRequest
+      .builder()
+      .bucket(bucketName)
+      .key(objectKey)
+      .build()
+
+    s3Client.getObject(getObjectRequest)
+  }
+
+  /**
+    * Checks if an object exists in S3.
+    *
+    * @param bucketName The S3 bucket name.
+    * @param objectKey The object key (path) in S3.
+    * @return True if the object exists, false otherwise.
+    */
+  def objectExists(bucketName: String, objectKey: String): Boolean = {
+    try {
+      val headObjectRequest = HeadObjectRequest
+        .builder()
+        .bucket(bucketName)
+        .key(objectKey)
+        .build()
+      s3Client.headObject(headObjectRequest)
+      true
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+    * Deletes a single object from S3.
+    *
+    * @param bucketName The S3 bucket name.
+    * @param objectKey The object key (path) in S3.
+    */
+  def deleteObject(bucketName: String, objectKey: String): Unit = {
+    val deleteObjectRequest = DeleteObjectRequest
+      .builder()
+      .bucket(bucketName)
+      .key(objectKey)
+      .build()
+
+    s3Client.deleteObject(deleteObjectRequest)
+  }
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
index 84e3de95a6..aa198a9d2d 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java
@@ -30,7 +30,8 @@ public enum FileAttributeType {
     DOUBLE("double", AttributeType.DOUBLE),
     BOOLEAN("boolean", AttributeType.BOOLEAN),
     TIMESTAMP("timestamp", AttributeType.TIMESTAMP),
-    BINARY("binary", AttributeType.BINARY);
+    BINARY("binary", AttributeType.BINARY),
+    BIG_OBJECT("big object", AttributeType.BIG_OBJECT);
 
 
     private final String name;
@@ -56,6 +57,6 @@ public enum FileAttributeType {
     }
 
     public boolean isSingle() {
-        return this == SINGLE_STRING || this == BINARY;
+        return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT;
     }
 }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
index 2124c9da43..1b5a012b00 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala
@@ -19,7 +19,7 @@
 
 package org.apache.amber.operator.source.scan
 
-import org.apache.amber.core.executor.SourceOperatorExecutor
+import org.apache.amber.core.executor.{ExecutionContext, 
SourceOperatorExecutor}
 import org.apache.amber.core.storage.DocumentFactory
 import org.apache.amber.core.tuple.AttributeTypeUtils.parseField
 import org.apache.amber.core.tuple.TupleLike
@@ -27,6 +27,7 @@ import org.apache.amber.util.JSONUtils.objectMapper
 import org.apache.commons.compress.archivers.ArchiveStreamFactory
 import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream
 import org.apache.commons.io.IOUtils.toByteArray
+import org.apache.texera.service.util.BigObjectManager
 
 import java.io._
 import java.net.URI
@@ -84,6 +85,14 @@ class FileScanSourceOpExec private[scan] (
             fields.addOne(desc.attributeType match {
               case FileAttributeType.SINGLE_STRING =>
                 new String(toByteArray(entry), desc.fileEncoding.getCharset)
+              case FileAttributeType.BIG_OBJECT =>
+                // For big objects, create a big object pointer from the input 
stream
+                // Get execution ID from thread-local context
+                val executionId = ExecutionContext.getExecutionId
+                  .getOrElse(
+                    throw new IllegalStateException("Execution ID not set in 
ExecutionContext")
+                  )
+                BigObjectManager.create(executionId, entry)
               case _ => parseField(toByteArray(entry), 
desc.attributeType.getType)
             })
             TupleLike(fields.toSeq: _*)
diff --git a/file-service/build.sbt b/file-service/build.sbt
index 68ac82e6b3..34b30472e0 100644
--- a/file-service/build.sbt
+++ b/file-service/build.sbt
@@ -84,7 +84,4 @@ libraryDependencies ++= Seq(
   "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.1.0", // Ensure Jakarta JAX-RS 
API is available
   "org.bitbucket.b_c" % "jose4j" % "0.9.6",
   "org.playframework" %% "play-json" % "3.1.0-M1",
-  "software.amazon.awssdk" % "s3" % "2.29.51",
-  "software.amazon.awssdk" % "auth" % "2.29.51",
-  "software.amazon.awssdk" % "regions" % "2.29.51",
 )
diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql
index 7b0f9b9063..188f4b8e98 100644
--- a/sql/texera_ddl.sql
+++ b/sql/texera_ddl.sql
@@ -443,3 +443,10 @@ BEGIN
 END $$;
 
 -- END Fulltext search index creation (DO NOT EDIT THIS LINE)
+
+CREATE TABLE big_object (
+  execution_id INT NOT NULL,
+  uri TEXT NOT NULL UNIQUE,
+  creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE 
CASCADE
+);
\ No newline at end of file
diff --git a/sql/updates/big_object.sql b/sql/updates/big_object.sql
new file mode 100644
index 0000000000..0ffaabcc2c
--- /dev/null
+++ b/sql/updates/big_object.sql
@@ -0,0 +1,33 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+
+SET search_path TO texera_db;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+CREATE TABLE big_object (
+  execution_id INT NOT NULL,
+  uri TEXT NOT NULL UNIQUE,
+  creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE 
CASCADE
+);
\ No newline at end of file


Reply via email to