This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 95f77257b45373d2bfc65d85d41559305283d3cb
Merge: 384570e475 e27f6f95ad
Author: Michael Blow <[email protected]>
AuthorDate: Wed Oct 15 19:58:53 2025 -0400

    Merge branch 'gerrit/trinity' into 'gerrit/phoenix'
    
     * [NO ISSUE][*DB][EXT] Make IDataParser extend Closeable, misc
     * [NO ISSUE][EXT]: Use ABFSS instead of deprecated WASBS
     * [NO ISSUE][*DB][STO] Fix IndexOutOfBoundsException removing last page...
    
    Ext-ref: MB-67819
    Change-Id: I49f69c95163d54b4c199f234291047fd01c27775

 .../org/apache/asterix/active/ActiveManager.java   |  1 +
 .../apache/asterix/external/api/IDataParser.java   | 13 ++++++---
 .../asterix/external/api/IRecordConverter.java     |  8 +++++-
 .../dataflow/FeedRecordDataFlowController.java     |  2 ++
 .../util/azure/blob_storage/AzureConstants.java    |  2 +-
 asterixdb/pom.xml                                  | 16 ++++-------
 .../std/util/ByteArrayAccessibleOutputStream.java  | 18 +++++++++++++
 .../apache/hyracks/http/server/utils/HttpUtil.java |  6 +++--
 .../dataflow/IndexDropOperatorNodePushable.java    |  2 +-
 .../storage/am/lsm/common/impls/LSMHarness.java    | 31 ++++++++++++++++------
 10 files changed, 71 insertions(+), 28 deletions(-)

diff --cc 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index e4ab8bdd16,abb41f2a0f..792fb4c4ea
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@@ -18,9 -18,24 +18,15 @@@
   */
  package org.apache.asterix.external.api;
  
+ import java.io.Closeable;
  import java.io.IOException;
 -import java.util.function.LongSupplier;
  
  @FunctionalInterface
- public interface IRecordConverter<I, O> {
+ public interface IRecordConverter<I, O> extends Closeable {
+ 
      O convert(IRawRecord<? extends I> input) throws IOException;
+ 
 -    /**
 -     * Configures the converter with information suppliers from the {@link 
IRecordReader} data source.
 -     *
 -     * @param lineNumber line number supplier
 -     */
 -    default void configure(LongSupplier lineNumber) {
 -    }
 -
+     default void close() {
+         // default no-op
+     }
  }
diff --cc 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index b1660a500b,236d853b4c..09b2cecc9b
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@@ -19,12 -19,11 +19,15 @@@
  
  package org.apache.hyracks.storage.am.lsm.common.impls;
  
 +import static 
org.apache.hyracks.util.ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES;
 +
+ import java.time.ZoneId;
+ import java.time.ZonedDateTime;
+ import java.time.format.DateTimeFormatter;
  import java.util.ArrayList;
  import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.function.Predicate;
  
@@@ -144,14 -136,11 +149,12 @@@ public class LSMHarness implements ILSM
          int numEntered = 0;
          boolean entranceSuccessful = false;
          try {
--            final int componentsCount = components.size();
--            for (int i = 0; i < componentsCount; i++) {
--                final ILSMComponent component = components.get(i);
++            for (final ILSMComponent component : components) {
                  boolean isMutableComponent = numEntered == 0 && 
component.getType() == LSMComponentType.MEMORY;
                  if (!component.threadEnter(opType, isMutableComponent)) {
 +                    if (dumpState) {
 +                        LOGGER.warn("couldn't enter component: {}", 
component.dumpState());
 +                    }
                      break;
                  }
                  numEntered++;
@@@ -262,13 -246,43 +265,13 @@@
                  }
              }
          } finally {
 -            /*
 -             * cleanup inactive disk components if any
 -             */
 -            if (inactiveDiskComponentsToBeDeleted != null) {
 -                try {
 -                    //schedule a replication job to delete these inactive 
disk components from replicas
 -                    if (replicationEnabled) {
 -                        lsmIndex.scheduleReplication(null, 
inactiveDiskComponentsToBeDeleted,
 -                                ReplicationOperation.DELETE, opType);
 -                    }
 -                    for (ILSMDiskComponent c : 
inactiveDiskComponentsToBeDeleted) {
 -                        c.deactivateAndDestroy();
 -                    }
 -                } catch (Throwable e) { // NOSONAR Log and re-throw
 -                    if (LOGGER.isWarnEnabled()) {
 -                        LOGGER.log(Level.WARN, "Failure scheduling 
replication or destroying merged component", e);
 -                    }
 -                    throw e; // NOSONAR: The last call in the finally clause
 -                }
 -            }
 +            // the memory components clean up must be done first to avoid any 
unexpected exceptions during the rest
 +            // of the finally block
              if (inactiveMemoryComponentsToBeCleanedUp != null) {
 -                for (ILSMMemoryComponent c : 
inactiveMemoryComponentsToBeCleanedUp) {
 -                    tracer.instant(c.toString(), traceCategory, Scope.p, 
lsmIndex::toString);
 -                    c.cleanup();
 -                    synchronized (opTracker) {
 -                        c.reset();
 -                        // Notify all waiting threads whenever the mutable 
component's state
 -                        // has changed to inactive. This is important because 
even though we switched
 -                        // the mutable components, it is possible that the 
component that we just
 -                        // switched to is still busy flushing its data to 
disk. Thus, the notification
 -                        // that was issued upon scheduling the flush is not 
enough.
 -                        opTracker.notifyAll(); // NOSONAR: Always called 
inside synchronized block
 -                    }
 -                }
 +                
cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp);
              }
 -            if (opType == LSMOperationType.FLUSH) {
 -                ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) 
ctx.getComponentHolder().get(0);
 +            if (opType == LSMOperationType.FLUSH && !failedOperation) {
-                 ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) 
ctx.getComponentHolder().get(0);
++                ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) 
ctx.getComponentHolder().getFirst();
                  // We must call flushed without synchronizing on opTracker to 
avoid deadlocks
                  flushingComponent.flushed();
              }
@@@ -383,7 -387,7 +386,7 @@@
          }
          getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
          try {
--            AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) 
ctx.getComponentHolder().get(0);
++            AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) 
ctx.getComponentHolder().getFirst();
              c.getMetadata().put(key, value);
              c.setModified();
          } finally {
@@@ -407,7 -411,7 +410,7 @@@
          }
          getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, 
false);
          try {
--            AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) 
ctx.getComponentHolder().get(0);
++            AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) 
ctx.getComponentHolder().getFirst();
              c.getMetadata().put(key, value);
              c.setModified();
          } finally {
@@@ -427,7 -431,7 +430,8 @@@
          try {
              lsmIndex.modify(ctx, tuple);
              // The mutable component is always in the first index.
--            AbstractLSMMemoryComponent mutableComponent = 
(AbstractLSMMemoryComponent) ctx.getComponentHolder().get(0);
++            AbstractLSMMemoryComponent mutableComponent =
++                    (AbstractLSMMemoryComponent) 
ctx.getComponentHolder().getFirst();
              mutableComponent.setModified();
          } catch (Exception e) {
              failedOperation = true;
@@@ -547,9 -549,11 +551,11 @@@
      }
  
      public void doIo(ILSMIOOperation operation) {
+         String origName = Thread.currentThread().getName();
          try {
+             Thread.currentThread().setName(threadName(operation));
              operation.getCallback().beforeOperation(operation);
 -            ILSMDiskComponent newComponent = operation.getIOOpertionType() == 
LSMIOOperationType.FLUSH
 +            ILSMDiskComponent newComponent = operation.getIOOperationType() 
== LSMIOOperationType.FLUSH
                      ? lsmIndex.flush(operation) : lsmIndex.merge(operation);
              operation.setNewComponent(newComponent);
              operation.getCallback().afterOperation(operation);
@@@ -569,10 -573,11 +575,11 @@@
                  operation.setStatus(LSMIOOperationStatus.FAILURE);
                  operation.setFailure(th);
                  if (LOGGER.isErrorEnabled()) {
 -                    LOGGER.log(Level.ERROR, "{} operation.afterFinalize 
failed on {}", operation.getIOOpertionType(),
 -                            lsmIndex, th);
 +                    LOGGER.error("{} operation.afterFinalize failed on {}", 
operation.getIOOperationType(), lsmIndex,
 +                            th);
                  }
              }
+             Thread.currentThread().setName(origName);
          }
          // if the operation failed, we need to cleanup files
          if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
@@@ -581,13 -585,19 +588,21 @@@
          }
      }
  
+     private static String threadName(ILSMIOOperation operation) {
 -        if (operation.getIOOpertionType() == LSMIOOperationType.NOOP) {
 -            return String.valueOf(operation.getIOOpertionType());
++        if (operation.getIOOperationType() == LSMIOOperationType.NOOP) {
++            return String.valueOf(operation.getIOOperationType());
+         }
 -        return operation.getIOOpertionType() + ":" + 
operation.getTarget().getRelativePath() + "@"
++        return operation.getIOOperationType() + ":" + 
operation.getTarget().getRelativePath() + "@"
+                 + OP_THREAD_TIMESTAMP.format(ZonedDateTime.now());
+     }
+ 
      @Override
      public void merge(ILSMIOOperation operation) throws HyracksDataException {
 -        LOGGER.debug("Started a merge operation for index {}", lsmIndex);
 +        List<ILSMComponent> mergingComponents = ((MergeOperation) 
operation).getMergingComponents();
 +        LOGGER.debug("Started MERGE operation {} of {} components for index: 
{}", operation, mergingComponents.size(),
 +                lsmIndex);
          synchronized (opTracker) {
 -            enterComponents(operation.getAccessor().getOpContext(), 
LSMOperationType.MERGE);
 +            enterComponents(operation.getAccessor().getOpContext(), 
LSMOperationType.MERGE, false);
          }
          ILSMDiskComponent newComponent;
          try {
@@@ -783,7 -792,7 +798,7 @@@
      @Override
      public void deleteComponents(ILSMIndexOperationContext ctx, 
Predicate<ILSMComponent> predicate)
              throws HyracksDataException {
--        ILSMIOOperation ioOperation = null;
++        ILSMIOOperation ioOperation;
          // We need to always start the component delete from current memory 
component.
          // This will ensure Primary and secondary component id still matches 
after component delete
          if (!lsmIndex.isMemoryComponentsAllocated()) {

Reply via email to