This is an automated email from the ASF dual-hosted git repository. kunwp1 pushed a commit to branch chris-big-object-version-0 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 8c68c05baca59d9ea5f83438d9ddacf697c88816 Author: Kunwoo Park <[email protected]> AuthorDate: Tue Oct 14 13:48:08 2025 -0700 Big Object for Java Native Operators --- .../engine/architecture/rpc/controlcommands.proto | 1 + .../scheduling/RegionExecutionCoordinator.scala | 3 +- .../InitializeExecutorHandler.scala | 4 + .../dashboard/user/workflow/WorkflowResource.scala | 6 + .../texera/web/service/WorkflowService.scala | 5 + .../engine/architecture/worker/WorkerSpec.scala | 3 +- common/workflow-core/build.sbt | 5 +- .../amber/core/executor/ExecutionContext.scala | 52 ++++++ .../org/apache/amber/core/tuple/AttributeType.java | 3 + .../amber/core/tuple/AttributeTypeUtils.scala | 21 ++- .../apache/amber/core/tuple/BigObjectPointer.java | 79 ++++++++ .../scala/org/apache/amber/util/IcebergUtil.scala | 93 ++++++++-- .../texera/service/util/BigObjectManager.scala | 199 +++++++++++++++++++++ .../texera/service/util/S3StorageClient.scala | 131 ++++++++++++++ .../operator/source/scan/FileAttributeType.java | 5 +- .../source/scan/FileScanSourceOpExec.scala | 11 +- file-service/build.sbt | 3 - sql/texera_ddl.sql | 7 + sql/updates/big_object.sql | 33 ++++ 19 files changed, 630 insertions(+), 34 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto index 41f0976314..d596f8b044 100644 --- a/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/amber/engine/architecture/rpc/controlcommands.proto @@ -256,6 +256,7 @@ message InitializeExecutorRequest { int32 totalWorkerCount = 1; core.OpExecInitInfo opExecInitInfo = 2; bool isSource = 3; + int64 executionId = 4; } message UpdateExecutorRequest { diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index bacc345a2c..588054fd0f 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -359,7 +359,8 @@ class RegionExecutionCoordinator( InitializeExecutorRequest( workerConfigs.length, physicalOp.opExecInitInfo, - physicalOp.isSourceOperator + physicalOp.isSourceOperator, + physicalOp.executionId.id ), asyncRPCClient.mkContext(workerId) ) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index 32a718606c..b1d6b2f609 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -42,6 +42,10 @@ trait InitializeExecutorHandler { dp.serializationManager.setOpInitialization(req) val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) val workerCount = req.totalWorkerCount + + // Set execution context for this thread + ExecutionContext.setExecutionId(req.executionId) + dp.executor = req.opExecInitInfo match { case OpExecWithClassName(className, descString) => ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala index 01ae898a66..b132fc1d18 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowResource.scala @@ -36,6 +36,7 @@ import org.apache.texera.dao.jooq.generated.tables.daos.{ WorkflowUserAccessDao } import org.apache.texera.dao.jooq.generated.tables.pojos._ +import org.apache.texera.service.util.BigObjectManager import org.apache.texera.web.resource.dashboard.hub.EntityType import org.apache.texera.web.resource.dashboard.hub.HubResource.recordCloneAction import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowAccessResource.hasReadAccess @@ -600,6 +601,11 @@ class WorkflowResource extends LazyLogging { .asScala .toList + // Delete big objects + eids.foreach { eid => + BigObjectManager.delete(eid.toInt) + } + // Collect all URIs related to executions for cleanup val uris = eids.flatMap { eid => val executionId = ExecutionIdentity(eid.longValue()) diff --git a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala index 01c66fb458..35dd9fde27 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala @@ -46,6 +46,7 @@ import org.apache.amber.engine.architecture.worker.WorkflowWorker.{ } import org.apache.amber.error.ErrorUtils.{getOperatorFromActorIdOpt, getStackTraceWithAllCauses} import org.apache.texera.dao.jooq.generated.tables.pojos.User +import org.apache.texera.service.util.BigObjectManager import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent import org.apache.texera.web.model.websocket.request.WorkflowExecuteRequest import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource @@ -307,6 +308,7 @@ class WorkflowService( * 2. Clears URI references from the execution registry * 3. Safely clears all result and console message documents * 4. Expires Iceberg snapshots for runtime statistics + * 5. Deletes big objects from MinIO * * @param eid The execution identity to clean up resources for */ @@ -343,6 +345,9 @@ class WorkflowService( logger.debug(s"Error processing document at $uri: ${error.getMessage}") } } + + // Delete big objects + BigObjectManager.delete(eid.id.toInt) } } diff --git a/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala b/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala index cc874f1c8a..ef54e706af 100644 --- a/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala +++ b/amber/src/test/scala/org/apache/amber/engine/architecture/worker/WorkerSpec.scala @@ -194,7 +194,8 @@ class WorkerSpec InitializeExecutorRequest( 1, OpExecWithClassName("org.apache.amber.engine.architecture.worker.DummyOperatorExecutor"), - isSource = false + isSource = false, + 1 ), AsyncRPCContext(CONTROLLER, identifier1), 4 diff --git a/common/workflow-core/build.sbt b/common/workflow-core/build.sbt index c62fad7b4d..122c267271 100644 --- a/common/workflow-core/build.sbt +++ b/common/workflow-core/build.sbt @@ -182,5 +182,8 @@ libraryDependencies ++= Seq( "org.apache.commons" % "commons-vfs2" % "2.9.0", // for FileResolver throw VFS-related exceptions "io.lakefs" % "sdk" % "1.51.0", // for lakeFS api calls "com.typesafe" % "config" % "1.4.3", // config reader - "org.apache.commons" % "commons-jcs3-core" % "3.2" // Apache Commons JCS + "org.apache.commons" % "commons-jcs3-core" % "3.2", // Apache Commons JCS + "software.amazon.awssdk" % "s3" % "2.29.51", + "software.amazon.awssdk" % "auth" % "2.29.51", + "software.amazon.awssdk" % "regions" % "2.29.51", ) \ No newline at end of file diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala new file mode 100644 index 0000000000..af33e84d2a --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/ExecutionContext.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.core.executor + +/** + * ExecutionContext provides thread-local access to execution metadata. + * This allows operator executors to access execution ID. + */ +object ExecutionContext { + private val executionIdThreadLocal: ThreadLocal[Option[Int]] = ThreadLocal.withInitial(() => None) + + /** + * Sets the execution ID for the current thread. + * Should be called when initializing an executor. + */ + def setExecutionId(executionId: Long): Unit = { + executionIdThreadLocal.set(Some(executionId.toInt)) + } + + /** + * Gets the execution ID for the current thread. + * @return Some(executionId) if set, None otherwise + */ + def getExecutionId: Option[Int] = { + executionIdThreadLocal.get() + } + + /** + * Clears the execution ID for the current thread. + * Should be called when cleaning up. + */ + def clear(): Unit = { + executionIdThreadLocal.remove() + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java index 73314fba0b..8791e92ff3 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeType.java @@ -70,6 +70,7 @@ public enum AttributeType implements Serializable { BOOLEAN("boolean", Boolean.class), TIMESTAMP("timestamp", Timestamp.class), BINARY("binary", byte[].class), + BIG_OBJECT("big_object", BigObjectPointer.class), ANY("ANY", Object.class); private final String name; @@ -109,6 +110,8 @@ public enum AttributeType implements Serializable { return TIMESTAMP; } else if (fieldClass.equals(byte[].class)) { return BINARY; + } else if (fieldClass.equals(BigObjectPointer.class)) { + return BIG_OBJECT; } else { return ANY; } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala index e4fdcb4611..8184560204 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/AttributeTypeUtils.scala @@ -121,14 +121,15 @@ object AttributeTypeUtils extends Serializable { ): Any = { if (field == null) return null attributeType match { - case AttributeType.INTEGER => parseInteger(field, force) - case AttributeType.LONG => parseLong(field, force) - case AttributeType.DOUBLE => parseDouble(field) - case AttributeType.BOOLEAN => parseBoolean(field) - case AttributeType.TIMESTAMP => parseTimestamp(field) - case AttributeType.STRING => field.toString - case AttributeType.BINARY => field - case AttributeType.ANY | _ => field + case AttributeType.INTEGER => parseInteger(field, force) + case AttributeType.LONG => parseLong(field, force) + case AttributeType.DOUBLE => parseDouble(field) + case AttributeType.BOOLEAN => parseBoolean(field) + case AttributeType.TIMESTAMP => parseTimestamp(field) + case AttributeType.STRING => field.toString + case AttributeType.BINARY => field + case AttributeType.BIG_OBJECT => field // Big objects are created programmatically, not parsed + case AttributeType.ANY | _ => field } } @@ -383,7 +384,9 @@ object AttributeTypeUtils extends Serializable { case AttributeType.INTEGER => tryParseInteger(fieldValue) case AttributeType.TIMESTAMP => tryParseTimestamp(fieldValue) case AttributeType.BINARY => tryParseString() - case _ => tryParseString() + case AttributeType.BIG_OBJECT => + AttributeType.BIG_OBJECT // Big objects are never inferred from data + case _ => tryParseString() } } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java new file mode 100644 index 0000000000..bfcfc840e7 --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/BigObjectPointer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.core.tuple; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; + +import java.io.Serializable; +import java.net.URI; +import java.util.Objects; + +/** + * BigObjectPointer represents a pointer to a large object stored in S3. + * The pointer is formatted as a URI: s3://bucket/path/to/object + */ +public class BigObjectPointer implements Serializable { + + private final String uri; + private final transient URI parsedUri; + + @JsonCreator + public BigObjectPointer(@JsonProperty("uri") String uri) { + if (uri == null || !uri.startsWith("s3://")) { + throw new IllegalArgumentException("BigObjectPointer URI must start with 's3://' but was: " + uri); + } + this.uri = uri; + this.parsedUri = URI.create(uri); + } + + @JsonValue + public String getUri() { + return uri; + } + + public String getBucketName() { + return parsedUri.getHost(); + } + + public String getObjectKey() { + String path = parsedUri.getPath(); + return path.startsWith("/") ? path.substring(1) : path; + } + + @Override + public String toString() { + return uri; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof BigObjectPointer)) return false; + BigObjectPointer that = (BigObjectPointer) obj; + return Objects.equals(uri, that.uri); + } + + @Override + public int hashCode() { + return Objects.hash(uri); + } +} diff --git a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala index bc17139641..c5c73660fa 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala @@ -20,7 +20,7 @@ package org.apache.amber.util import org.apache.amber.config.StorageConfig -import org.apache.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.amber.core.tuple.{Attribute, AttributeType, BigObjectPointer, Schema, Tuple} import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.{Catalog, TableIdentifier} import org.apache.iceberg.data.parquet.GenericParquetReaders @@ -52,6 +52,9 @@ import scala.jdk.CollectionConverters._ */ object IcebergUtil { + // Unique suffix for BIG_OBJECT field encoding + private val BIG_OBJECT_FIELD_SUFFIX = "__texera_big_obj_ptr" + /** * Creates and initializes a HadoopCatalog with the given parameters. * - Uses an empty Hadoop `Configuration`, meaning the local file system (or `file:/`) will be used by default @@ -207,7 +210,9 @@ object IcebergUtil { def toIcebergSchema(amberSchema: Schema): IcebergSchema = { val icebergFields = amberSchema.getAttributes.zipWithIndex.map { case (attribute, index) => - Types.NestedField.optional(index + 1, attribute.getName, toIcebergType(attribute.getType)) + // Encode field name for BIG_OBJECT types to ensure they can be detected when reading back + val encodedName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + Types.NestedField.optional(index + 1, encodedName, toIcebergType(attribute.getType)) } new IcebergSchema(icebergFields.asJava) } @@ -227,6 +232,8 @@ object IcebergUtil { case AttributeType.BOOLEAN => Types.BooleanType.get() case AttributeType.TIMESTAMP => Types.TimestampType.withoutZone() case AttributeType.BINARY => Types.BinaryType.get() + case AttributeType.BIG_OBJECT => + Types.StringType.get() // Store BigObjectPointer URI as string case AttributeType.ANY => throw new IllegalArgumentException("ANY type is not supported in Iceberg") } @@ -244,12 +251,14 @@ object IcebergUtil { tuple.schema.getAttributes.zipWithIndex.foreach { case (attribute, index) => val value = tuple.getField[AnyRef](index) match { - case null => null - case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime - case bytes: Array[Byte] => ByteBuffer.wrap(bytes) - case other => other + case null => null + case ts: Timestamp => ts.toInstant.atZone(ZoneId.systemDefault()).toLocalDateTime + case bytes: Array[Byte] => ByteBuffer.wrap(bytes) + case bigObjPtr: BigObjectPointer => bigObjPtr.getUri + case other => other } - record.setField(attribute.getName, value) + val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + record.setField(fieldName, value) } record @@ -264,21 +273,54 @@ object IcebergUtil { */ def fromRecord(record: Record, amberSchema: Schema): Tuple = { val fieldValues = amberSchema.getAttributes.map { attribute => - val value = record.getField(attribute.getName) match { - case null => null - case ldt: LocalDateTime => Timestamp.valueOf(ldt) - case buffer: ByteBuffer => + val fieldName = encodeBigObjectFieldName(attribute.getName, attribute.getType) + val rawValue = record.getField(fieldName) + + (rawValue, attribute.getType) match { + case (null, _) => null + case (ldt: LocalDateTime, _) => Timestamp.valueOf(ldt) + case (buffer: ByteBuffer, _) => val bytes = new Array[Byte](buffer.remaining()) buffer.get(bytes) bytes - case other => other + case (uri: String, AttributeType.BIG_OBJECT) => new BigObjectPointer(uri) + case (other, _) => other } - value } Tuple(amberSchema, fieldValues.toArray) } + /** + * Encodes a field name for BIG_OBJECT types by adding a unique system suffix. + * This ensures BIG_OBJECT fields can be identified when reading from Iceberg. + * + * @param fieldName The original field name + * @param attributeType The attribute type + * @return The encoded field name with a unique suffix for BIG_OBJECT types + */ + def encodeBigObjectFieldName(fieldName: String, attributeType: AttributeType): String = { + attributeType match { + case AttributeType.BIG_OBJECT => s"${fieldName}${BIG_OBJECT_FIELD_SUFFIX}" + case _ => fieldName + } + } + + /** + * Decodes a field name by removing the unique system suffix if present. + * This restores the original user-defined field name. + * + * @param fieldName The encoded field name + * @return The original field name with system suffix removed + */ + def decodeBigObjectFieldName(fieldName: String): String = { + if (fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)) { + fieldName.substring(0, fieldName.length - BIG_OBJECT_FIELD_SUFFIX.length) + } else { + fieldName + } + } + /** * Converts an Iceberg `Schema` to an Amber `Schema`. * @@ -290,7 +332,16 @@ object IcebergUtil { .columns() .asScala .map { field => - new Attribute(field.name(), fromIcebergType(field.`type`().asPrimitiveType())) + // Determine the attribute type using field name hints + val attributeType = fromIcebergType( + field.`type`().asPrimitiveType(), + field.name() + ) + + // Decode field name to restore original name for BIG_OBJECT types + val originalName = decodeBigObjectFieldName(field.name()) + + new Attribute(originalName, attributeType) } .toList @@ -301,11 +352,21 @@ object IcebergUtil { * Converts an Iceberg `Type` to an Amber `AttributeType`. * * @param icebergType The Iceberg Type. + * @param fieldName The field name (used to detect BIG_OBJECT by suffix). * @return The corresponding Amber AttributeType. */ - def fromIcebergType(icebergType: PrimitiveType): AttributeType = { + def fromIcebergType( + icebergType: PrimitiveType, + fieldName: String = "" + ): AttributeType = { icebergType match { - case _: Types.StringType => AttributeType.STRING + case _: Types.StringType => + // Check for BIG_OBJECT by examining the unique suffix pattern + if (fieldName.nonEmpty && fieldName.endsWith(BIG_OBJECT_FIELD_SUFFIX)) { + AttributeType.BIG_OBJECT + } else { + AttributeType.STRING + } case _: Types.IntegerType => AttributeType.INTEGER case _: Types.LongType => AttributeType.LONG case _: Types.DoubleType => AttributeType.DOUBLE diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala new file mode 100644 index 0000000000..19586372cc --- /dev/null +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectManager.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging +import org.apache.amber.core.tuple.BigObjectPointer +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.Tables.BIG_OBJECT + +import java.io.{Closeable, InputStream} +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import scala.jdk.CollectionConverters._ + +/** + * BigObjectStream wraps an InputStream and tracks its lifecycle for proper cleanup. + */ +class BigObjectStream(private val inputStream: InputStream, private val pointer: BigObjectPointer) + extends InputStream + with Closeable { + + @volatile private var closed = false + + private def checkClosed(): Unit = { + if (closed) throw new IllegalStateException("Stream is closed") + } + + override def read(): Int = { + checkClosed() + inputStream.read() + } + + override def read(b: Array[Byte]): Int = { + checkClosed() + inputStream.read(b) + } + + override def read(b: Array[Byte], off: Int, len: Int): Int = { + checkClosed() + inputStream.read(b, off, len) + } + + override def available(): Int = if (closed) 0 else inputStream.available() + + override def close(): Unit = { + if (!closed) { + closed = true + inputStream.close() + BigObjectManager.closeStream(pointer) + } + } + + def isClosed: Boolean = closed + def getPointer: BigObjectPointer = pointer +} + +/** + * BigObjectManager manages the lifecycle of large objects (>2GB) stored in S3. + */ +object BigObjectManager extends LazyLogging { + private val DEFAULT_BUCKET = "texera-big-objects" + private val openStreams = new ConcurrentHashMap[BigObjectPointer, BigObjectStream]() + private lazy val context = SqlServer.getInstance().createDSLContext() + + /** + * Creates a big object from an InputStream using multipart upload. + * Handles streams of any size without loading into memory. + * Registers the big object in the database for cleanup tracking. + * + * @param executionId The execution ID this big object belongs to. + * @param stream The input stream containing the big object data. + * @return A BigObjectPointer that can be used in tuples. + */ + def create(executionId: Int, stream: InputStream): BigObjectPointer = { + S3StorageClient.createBucketIfNotExist(DEFAULT_BUCKET) + + val objectKey = s"${System.currentTimeMillis()}/${UUID.randomUUID()}" + val uri = s"s3://$DEFAULT_BUCKET/$objectKey" + + // Upload to S3 + S3StorageClient.uploadObject(DEFAULT_BUCKET, objectKey, stream) + + // Register in database + try { + context + .insertInto(BIG_OBJECT) + .columns(BIG_OBJECT.EXECUTION_ID, BIG_OBJECT.URI) + .values(Int.box(executionId), uri) + .execute() + logger.debug(s"Registered big object: eid=$executionId, uri=$uri") + } catch { + case e: Exception => + // Database failed - clean up S3 object + logger.error(s"Failed to register big object, cleaning up: $uri", e) + try { + S3StorageClient.deleteObject(DEFAULT_BUCKET, objectKey) + } catch { + case cleanupError: Exception => + logger.error(s"Failed to cleanup orphaned S3 object: $uri", cleanupError) + } + throw new RuntimeException(s"Failed to create big object: ${e.getMessage}", e) + } + + new BigObjectPointer(uri) + } + + /** + * Opens a big object for reading. + * + * @param ptr The BigObjectPointer from a tuple field. + * @return A BigObjectStream for reading the big object data. + */ + def open(ptr: BigObjectPointer): BigObjectStream = { + require( + S3StorageClient.objectExists(ptr.getBucketName, ptr.getObjectKey), + s"Big object does not exist: ${ptr.getUri}" + ) + + val inputStream = S3StorageClient.downloadObject(ptr.getBucketName, ptr.getObjectKey) + val stream = new BigObjectStream(inputStream, ptr) + openStreams.put(ptr, stream) + stream + } + + /** + * Deletes all big objects associated with a specific execution ID. + * This is called during workflow execution cleanup. + * + * @param executionId The execution ID whose big objects should be deleted. + * @return The number of big objects successfully deleted from S3. + */ + def delete(executionId: Int): Unit = { + val uris = context + .select(BIG_OBJECT.URI) + .from(BIG_OBJECT) + .where(BIG_OBJECT.EXECUTION_ID.eq(executionId)) + .fetchInto(classOf[String]) + .asScala + .toList + + if (uris.isEmpty) { + logger.debug(s"No big objects for execution $executionId") + return + } + + logger.info(s"Deleting ${uris.size} big object(s) for execution $executionId") + + // Delete each object from S3 + uris.foreach { uri => + try { + val ptr = new BigObjectPointer(uri) + Option(openStreams.get(ptr)).foreach(_.close()) + S3StorageClient.deleteObject(ptr.getBucketName, ptr.getObjectKey) + } catch { + case e: Exception => + logger.error(s"Failed to delete big object: $uri", e) + } + } + + // Delete database records + context + .deleteFrom(BIG_OBJECT) + .where(BIG_OBJECT.EXECUTION_ID.eq(executionId)) + .execute() + } + + /** + * Closes a big object stream. Typically called automatically when the stream is closed. + */ + def close(ptr: BigObjectPointer): Unit = { + Option(openStreams.get(ptr)).foreach { stream => + if (!stream.isClosed) stream.close() + } + } + + /** + * Internal method to remove a stream from tracking when it's closed. + */ + private[util] def closeStream(ptr: BigObjectPointer): Unit = { + openStreams.remove(ptr) + } +} diff --git a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala similarity index 58% rename from file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala rename to common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index 7c157cc0ae..a281f6f6c2 100644 --- a/file-service/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -24,7 +24,9 @@ import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCrede import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.model._ import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} +import software.amazon.awssdk.core.sync.RequestBody +import java.io.InputStream import java.security.MessageDigest import scala.jdk.CollectionConverters._ @@ -139,4 +141,133 @@ object S3StorageClient { s3Client.deleteObjects(deleteObjectsRequest) } } + + /** + * Uploads an object to S3 using multipart upload. + * Handles streams of any size without loading into memory. + */ + def uploadObject(bucketName: String, objectKey: String, inputStream: InputStream): String = { + val buffer = new Array[Byte](MINIMUM_NUM_OF_MULTIPART_S3_PART.toInt) + val uploadId = s3Client + .createMultipartUpload( + CreateMultipartUploadRequest.builder().bucket(bucketName).key(objectKey).build() + ) + .uploadId() + + try { + val parts = Iterator + .from(1) + .map { partNumber => + // Fill buffer completely + var offset = 0 + var read = 0 + while ( + offset < buffer.length && { + read = inputStream.read(buffer, offset, buffer.length - offset); read > 0 + } + ) { + offset += read + } + + if (offset > 0) { + val eTag = s3Client + .uploadPart( + UploadPartRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .partNumber(partNumber) + .build(), + RequestBody.fromBytes(buffer.take(offset)) + ) + .eTag() + Some(CompletedPart.builder().partNumber(partNumber).eTag(eTag).build()) + } else None + } + .takeWhile(_.isDefined) + .flatten + .toList + + s3Client + .completeMultipartUpload( + CompleteMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(parts.asJava).build()) + .build() + ) + .eTag() + + } catch { + case e: Exception => + try { + s3Client.abortMultipartUpload( + AbortMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .build() + ) + } catch { case _: Exception => } + throw e + } + } + + /** + * Downloads an object from S3 as an InputStream. + * + * @param bucketName The S3 bucket name. + * @param objectKey The object key (path) in S3. + * @return An InputStream containing the object data. + */ + def downloadObject(bucketName: String, objectKey: String): InputStream = { + val getObjectRequest = GetObjectRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .build() + + s3Client.getObject(getObjectRequest) + } + + /** + * Checks if an object exists in S3. + * + * @param bucketName The S3 bucket name. + * @param objectKey The object key (path) in S3. + * @return True if the object exists, false otherwise. + */ + def objectExists(bucketName: String, objectKey: String): Boolean = { + try { + val headObjectRequest = HeadObjectRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .build() + s3Client.headObject(headObjectRequest) + true + } catch { + case _: Exception => false + } + } + + /** + * Deletes a single object from S3. + * + * @param bucketName The S3 bucket name. + * @param objectKey The object key (path) in S3. + */ + def deleteObject(bucketName: String, objectKey: String): Unit = { + val deleteObjectRequest = DeleteObjectRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .build() + + s3Client.deleteObject(deleteObjectRequest) + } } diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java index 84e3de95a6..aa198a9d2d 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileAttributeType.java @@ -30,7 +30,8 @@ public enum FileAttributeType { DOUBLE("double", AttributeType.DOUBLE), BOOLEAN("boolean", AttributeType.BOOLEAN), TIMESTAMP("timestamp", AttributeType.TIMESTAMP), - BINARY("binary", AttributeType.BINARY); + BINARY("binary", AttributeType.BINARY), + BIG_OBJECT("big object", AttributeType.BIG_OBJECT); private final String name; @@ -56,6 +57,6 @@ public enum FileAttributeType { } public boolean isSingle() { - return this == SINGLE_STRING || this == BINARY; + return this == SINGLE_STRING || this == BINARY || this == BIG_OBJECT; } } diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala index 2124c9da43..1b5a012b00 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/source/scan/FileScanSourceOpExec.scala @@ -19,7 +19,7 @@ package org.apache.amber.operator.source.scan -import org.apache.amber.core.executor.SourceOperatorExecutor +import org.apache.amber.core.executor.{ExecutionContext, SourceOperatorExecutor} import org.apache.amber.core.storage.DocumentFactory import org.apache.amber.core.tuple.AttributeTypeUtils.parseField import org.apache.amber.core.tuple.TupleLike @@ -27,6 +27,7 @@ import org.apache.amber.util.JSONUtils.objectMapper import org.apache.commons.compress.archivers.ArchiveStreamFactory import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream import org.apache.commons.io.IOUtils.toByteArray +import org.apache.texera.service.util.BigObjectManager import java.io._ import java.net.URI @@ -84,6 +85,14 @@ class FileScanSourceOpExec private[scan] ( fields.addOne(desc.attributeType match { case FileAttributeType.SINGLE_STRING => new String(toByteArray(entry), desc.fileEncoding.getCharset) + case FileAttributeType.BIG_OBJECT => + // For big objects, create a big object pointer from the input stream + // Get execution ID from thread-local context + val executionId = ExecutionContext.getExecutionId + .getOrElse( + throw new IllegalStateException("Execution ID not set in ExecutionContext") + ) + BigObjectManager.create(executionId, entry) case _ => parseField(toByteArray(entry), desc.attributeType.getType) }) TupleLike(fields.toSeq: _*) diff --git a/file-service/build.sbt b/file-service/build.sbt index 68ac82e6b3..34b30472e0 100644 --- a/file-service/build.sbt +++ b/file-service/build.sbt @@ -84,7 +84,4 @@ libraryDependencies ++= Seq( "jakarta.ws.rs" % "jakarta.ws.rs-api" % "3.1.0", // Ensure Jakarta JAX-RS API is available "org.bitbucket.b_c" % "jose4j" % "0.9.6", "org.playframework" %% "play-json" % "3.1.0-M1", - "software.amazon.awssdk" % "s3" % "2.29.51", - "software.amazon.awssdk" % "auth" % "2.29.51", - "software.amazon.awssdk" % "regions" % "2.29.51", ) diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index 7b0f9b9063..188f4b8e98 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -443,3 +443,10 @@ BEGIN END $$; -- END Fulltext search index creation (DO NOT EDIT THIS LINE) + +CREATE TABLE big_object ( + execution_id INT NOT NULL, + uri TEXT NOT NULL UNIQUE, + creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE CASCADE +); \ No newline at end of file diff --git a/sql/updates/big_object.sql b/sql/updates/big_object.sql new file mode 100644 index 0000000000..0ffaabcc2c --- /dev/null +++ b/sql/updates/big_object.sql @@ -0,0 +1,33 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ============================================ +-- 1. Connect to the texera_db database +-- ============================================ +\c texera_db + +SET search_path TO texera_db; + +-- ============================================ +-- 2. Update the table schema +-- ============================================ +CREATE TABLE big_object ( + execution_id INT NOT NULL, + uri TEXT NOT NULL UNIQUE, + creation_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (execution_id) REFERENCES workflow_executions(eid) ON DELETE CASCADE +); \ No newline at end of file
