This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 4874143e7d7328b1a220c542b45cd45b8a8135a9
Author: Michael Blow <[email protected]>
AuthorDate: Fri Oct 3 16:10:27 2025 -0400

    [NO ISSUE][*DB][EXT] Make IDataParser extend Closeable, misc
    
    Ext-ref: MB-68827
    Change-Id: Id8d97f948d048e5c204bb0b78c089420b76dddc9
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20443
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Tested-by: Michael Blow <[email protected]>
---
 .../java/org/apache/asterix/active/ActiveManager.java  |  1 +
 .../org/apache/asterix/external/api/IDataParser.java   | 13 +++++++++----
 .../apache/asterix/external/api/IRecordConverter.java  |  9 +++++++--
 .../dataflow/FeedRecordDataFlowController.java         |  2 ++
 .../data/std/util/ByteArrayAccessibleOutputStream.java | 18 ++++++++++++++++++
 .../org/apache/hyracks/http/server/utils/HttpUtil.java |  6 ++++--
 .../common/dataflow/IndexDropOperatorNodePushable.java |  2 +-
 .../storage/am/lsm/common/impls/LSMHarness.java        | 16 ++++++++++++++++
 8 files changed, 58 insertions(+), 9 deletions(-)

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index a75ee33979..0a3ce800fb 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -192,6 +192,7 @@ public class ActiveManager {
                     + " this node before the cluster controller sent the stop 
request");
         } else {
             executor.execute(() -> {
+                Thread.currentThread().setName(runtimeId.toString());
                 try {
                     stopIfRunning(runtime, content.getTimeout(), 
content.getUnit());
                 } catch (Throwable th) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index 5dbc383feb..d3a744831d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.external.api;
 
+import java.io.Closeable;
 import java.io.DataOutput;
 
 import org.apache.asterix.builders.IARecordBuilder;
@@ -37,14 +38,18 @@ import 
org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
-public interface IDataParser {
+public interface IDataParser extends Closeable {
+    @Override
+    default void close() {
+        // Default no-op
+    }
 
     /*
      * The following two static methods are expensive. right now, they are 
used by RSSFeeds and
      * Twitter feed
      * TODO: Get rid of them
      */
-    public static void writeRecord(AMutableRecord record, DataOutput 
dataOutput, IARecordBuilder recordBuilder)
+    static void writeRecord(AMutableRecord record, DataOutput dataOutput, 
IARecordBuilder recordBuilder)
             throws HyracksDataException {
         ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
         int numFields = record.getType().getFieldNames().length;
@@ -58,7 +63,7 @@ public interface IDataParser {
     }
 
     @SuppressWarnings("unchecked")
-    public static void writeObject(IAObject obj, DataOutput dataOutput) throws 
HyracksDataException {
+    static void writeObject(IAObject obj, DataOutput dataOutput) throws 
HyracksDataException {
         switch (obj.getType().getTypeTag()) {
             case OBJECT: {
                 IARecordBuilder recordBuilder = new RecordBuilder();
@@ -105,7 +110,7 @@ public interface IDataParser {
         }
     }
 
-    public static <T> void toBytes(T serializable, ArrayBackedValueStorage 
buffer, ISerializerDeserializer<T> serde)
+    static <T> void toBytes(T serializable, ArrayBackedValueStorage buffer, 
ISerializerDeserializer<T> serde)
             throws HyracksDataException {
         buffer.reset();
         DataOutput out = buffer.getDataOutput();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index f544ca041b..abb41f2a0f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -18,13 +18,14 @@
  */
 package org.apache.asterix.external.api;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.function.LongSupplier;
 
 @FunctionalInterface
-public interface IRecordConverter<I, O> {
+public interface IRecordConverter<I, O> extends Closeable {
 
-    public O convert(IRawRecord<? extends I> input) throws IOException;
+    O convert(IRawRecord<? extends I> input) throws IOException;
 
     /**
      * Configures the converter with information suppliers from the {@link 
IRecordReader} data source.
@@ -33,4 +34,8 @@ public interface IRecordConverter<I, O> {
      */
     default void configure(LongSupplier lineNumber) {
     }
+
+    default void close() {
+        // default no-op
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4279ebd712..6ef54974b3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -252,6 +252,8 @@ public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowControl
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw HyracksDataException.create(e);
+            } finally {
+                dataParser.close();
             }
             return true;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
index 1a806cd1fd..17f9a2e649 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ByteArrayAccessibleOutputStream.java
@@ -125,4 +125,22 @@ public class ByteArrayAccessibleOutputStream extends 
ByteArrayOutputStream {
         ensureCapacity(bytesRequired);
         count = bytesRequired;
     }
+
+    /**
+     * Reset the stream and shrink the internal buffer if its size is larger 
than newSize.
+     * @param newSize
+     * @return true if the internal buffer was reallocated, false otherwise.
+     */
+    public boolean resetAndShrink(int newSize) {
+        reset();
+        if (buf.length > newSize) {
+            buf = new byte[newSize];
+            return true;
+        }
+        return false;
+    }
+
+    public int capacity() {
+        return buf.length;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 9f01123c0b..cac8719194 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -240,7 +240,8 @@ public class HttpUtil {
                 try {
                     response.close();
                 } catch (IOException e) {
-                    LOGGER.debug("{} ignoring exception thrown on stream close 
due to interrupt", description, e);
+                    LOGGER.debug("{} ignoring exception thrown on stream close 
due to interrupt: {}", description,
+                            String.valueOf(e));
                 }
             });
             try {
@@ -249,7 +250,8 @@ public class HttpUtil {
                 LOGGER.warn("{} did not exit on stream close due to interrupt 
after 1s", description);
                 readFuture.cancel(true);
             } catch (ExecutionException ee) {
-                LOGGER.debug("ignoring exception awaiting aborted {} 
shutdown", description, ee);
+                LOGGER.debug("ignoring exception awaiting aborted {} shutdown: 
{}", description,
+                        String.valueOf(ee.getCause()));
             }
             throw ex;
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 3b6669ef52..5b9053b108 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -84,7 +84,7 @@ public class IndexDropOperatorNodePushable extends 
AbstractOperatorNodePushable
                 return;
             } catch (HyracksDataException e) {
                 if (isIgnorable(e)) {
-                    LOGGER.debug("Ignoring exception on drop", e);
+                    LOGGER.debug("Ignoring exception on drop: {}", 
String.valueOf(e));
                     return;
                 }
                 if (canRetry(e)) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 017e767721..2ac29fe640 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -19,6 +19,9 @@
 
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,6 +66,8 @@ import org.apache.logging.log4j.Logger;
 
 public class LSMHarness implements ILSMHarness {
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final DateTimeFormatter OP_THREAD_TIMESTAMP =
+            
DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneId.systemDefault());
 
     protected final ILSMIndex lsmIndex;
     protected final ILSMIOOperationScheduler ioScheduler;
@@ -543,7 +548,9 @@ public class LSMHarness implements ILSMHarness {
     }
 
     public void doIo(ILSMIOOperation operation) {
+        String origName = Thread.currentThread().getName();
         try {
+            Thread.currentThread().setName(threadName(operation));
             operation.getCallback().beforeOperation(operation);
             ILSMDiskComponent newComponent = operation.getIOOpertionType() == 
LSMIOOperationType.FLUSH
                     ? lsmIndex.flush(operation) : lsmIndex.merge(operation);
@@ -569,6 +576,7 @@ public class LSMHarness implements ILSMHarness {
                             lsmIndex, th);
                 }
             }
+            Thread.currentThread().setName(origName);
         }
         // if the operation failed, we need to cleanup files
         if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
@@ -576,6 +584,14 @@ public class LSMHarness implements ILSMHarness {
         }
     }
 
+    private static String threadName(ILSMIOOperation operation) {
+        if (operation.getIOOpertionType() == LSMIOOperationType.NOOP) {
+            return String.valueOf(operation.getIOOpertionType());
+        }
+        return operation.getIOOpertionType() + ":" + 
operation.getTarget().getRelativePath() + "@"
+                + OP_THREAD_TIMESTAMP.format(ZonedDateTime.now());
+    }
+
     @Override
     public void merge(ILSMIOOperation operation) throws HyracksDataException {
         if (LOGGER.isDebugEnabled()) {

Reply via email to