This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 95f77257b45373d2bfc65d85d41559305283d3cb Merge: 384570e475 e27f6f95ad Author: Michael Blow <[email protected]> AuthorDate: Wed Oct 15 19:58:53 2025 -0400 Merge branch 'gerrit/trinity' into 'gerrit/phoenix' * [NO ISSUE][*DB][EXT] Make IDataParser extend Closeable, misc * [NO ISSUE][EXT]: Use ABFSS instead of deprecated WASBS * [NO ISSUE][*DB][STO] Fix IndexOutOfBoundsException removing last page... Ext-ref: MB-67819 Change-Id: I49f69c95163d54b4c199f234291047fd01c27775 .../org/apache/asterix/active/ActiveManager.java | 1 + .../apache/asterix/external/api/IDataParser.java | 13 ++++++--- .../asterix/external/api/IRecordConverter.java | 8 +++++- .../dataflow/FeedRecordDataFlowController.java | 2 ++ .../util/azure/blob_storage/AzureConstants.java | 2 +- asterixdb/pom.xml | 16 ++++------- .../std/util/ByteArrayAccessibleOutputStream.java | 18 +++++++++++++ .../apache/hyracks/http/server/utils/HttpUtil.java | 6 +++-- .../dataflow/IndexDropOperatorNodePushable.java | 2 +- .../storage/am/lsm/common/impls/LSMHarness.java | 31 ++++++++++++++++------ 10 files changed, 71 insertions(+), 28 deletions(-) diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java index e4ab8bdd16,abb41f2a0f..792fb4c4ea --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java @@@ -18,9 -18,24 +18,15 @@@ */ package org.apache.asterix.external.api; + import java.io.Closeable; import java.io.IOException; -import java.util.function.LongSupplier; @FunctionalInterface - public interface IRecordConverter<I, O> { + public interface IRecordConverter<I, O> extends Closeable { + O convert(IRawRecord<? extends I> input) throws IOException; + - /** - * Configures the converter with information suppliers from the {@link IRecordReader} data source. - * - * @param lineNumber line number supplier - */ - default void configure(LongSupplier lineNumber) { - } - + default void close() { + // default no-op + } } diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index b1660a500b,236d853b4c..09b2cecc9b --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@@ -19,12 -19,11 +19,15 @@@ package org.apache.hyracks.storage.am.lsm.common.impls; +import static org.apache.hyracks.util.ExitUtil.EC_INCONSISTENT_STORAGE_REFERENCES; + + import java.time.ZoneId; + import java.time.ZonedDateTime; + import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@@ -144,14 -136,11 +149,12 @@@ public class LSMHarness implements ILSM int numEntered = 0; boolean entranceSuccessful = false; try { -- final int componentsCount = components.size(); -- for (int i = 0; i < componentsCount; i++) { -- final ILSMComponent component = components.get(i); ++ for (final ILSMComponent component : components) { boolean isMutableComponent = numEntered == 0 && component.getType() == LSMComponentType.MEMORY; if (!component.threadEnter(opType, isMutableComponent)) { + if (dumpState) { + LOGGER.warn("couldn't enter component: {}", component.dumpState()); + } break; } numEntered++; @@@ -262,13 -246,43 +265,13 @@@ } } } finally { - /* - * cleanup inactive disk components if any - */ - if (inactiveDiskComponentsToBeDeleted != null) { - try { - //schedule a replication job to delete these inactive disk components from replicas - if (replicationEnabled) { - lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, - ReplicationOperation.DELETE, opType); - } - for (ILSMDiskComponent c : inactiveDiskComponentsToBeDeleted) { - c.deactivateAndDestroy(); - } - } catch (Throwable e) { // NOSONAR Log and re-throw - if (LOGGER.isWarnEnabled()) { - LOGGER.log(Level.WARN, "Failure scheduling replication or destroying merged component", e); - } - throw e; // NOSONAR: The last call in the finally clause - } - } + // the memory components clean up must be done first to avoid any unexpected exceptions during the rest + // of the finally block if (inactiveMemoryComponentsToBeCleanedUp != null) { - for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) { - tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString); - c.cleanup(); - synchronized (opTracker) { - c.reset(); - // Notify all waiting threads whenever the mutable component's state - // has changed to inactive. This is important because even though we switched - // the mutable components, it is possible that the component that we just - // switched to is still busy flushing its data to disk. Thus, the notification - // that was issued upon scheduling the flush is not enough. - opTracker.notifyAll(); // NOSONAR: Always called inside synchronized block - } - } + cleanupInactiveMemoryComponents(inactiveMemoryComponentsToBeCleanedUp); } - if (opType == LSMOperationType.FLUSH) { - ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); + if (opType == LSMOperationType.FLUSH && !failedOperation) { - ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); ++ ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().getFirst(); // We must call flushed without synchronizing on opTracker to avoid deadlocks flushingComponent.flushed(); } @@@ -383,7 -387,7 +386,7 @@@ } getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false); try { -- AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) ctx.getComponentHolder().get(0); ++ AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) ctx.getComponentHolder().getFirst(); c.getMetadata().put(key, value); c.setModified(); } finally { @@@ -407,7 -411,7 +410,7 @@@ } getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, false); try { -- AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) ctx.getComponentHolder().get(0); ++ AbstractLSMMemoryComponent c = (AbstractLSMMemoryComponent) ctx.getComponentHolder().getFirst(); c.getMetadata().put(key, value); c.setModified(); } finally { @@@ -427,7 -431,7 +430,8 @@@ try { lsmIndex.modify(ctx, tuple); // The mutable component is always in the first index. -- AbstractLSMMemoryComponent mutableComponent = (AbstractLSMMemoryComponent) ctx.getComponentHolder().get(0); ++ AbstractLSMMemoryComponent mutableComponent = ++ (AbstractLSMMemoryComponent) ctx.getComponentHolder().getFirst(); mutableComponent.setModified(); } catch (Exception e) { failedOperation = true; @@@ -547,9 -549,11 +551,11 @@@ } public void doIo(ILSMIOOperation operation) { + String origName = Thread.currentThread().getName(); try { + Thread.currentThread().setName(threadName(operation)); operation.getCallback().beforeOperation(operation); - ILSMDiskComponent newComponent = operation.getIOOpertionType() == LSMIOOperationType.FLUSH + ILSMDiskComponent newComponent = operation.getIOOperationType() == LSMIOOperationType.FLUSH ? lsmIndex.flush(operation) : lsmIndex.merge(operation); operation.setNewComponent(newComponent); operation.getCallback().afterOperation(operation); @@@ -569,10 -573,11 +575,11 @@@ operation.setStatus(LSMIOOperationStatus.FAILURE); operation.setFailure(th); if (LOGGER.isErrorEnabled()) { - LOGGER.log(Level.ERROR, "{} operation.afterFinalize failed on {}", operation.getIOOpertionType(), - lsmIndex, th); + LOGGER.error("{} operation.afterFinalize failed on {}", operation.getIOOperationType(), lsmIndex, + th); } } + Thread.currentThread().setName(origName); } // if the operation failed, we need to cleanup files if (operation.getStatus() == LSMIOOperationStatus.FAILURE) { @@@ -581,13 -585,19 +588,21 @@@ } } + private static String threadName(ILSMIOOperation operation) { - if (operation.getIOOpertionType() == LSMIOOperationType.NOOP) { - return String.valueOf(operation.getIOOpertionType()); ++ if (operation.getIOOperationType() == LSMIOOperationType.NOOP) { ++ return String.valueOf(operation.getIOOperationType()); + } - return operation.getIOOpertionType() + ":" + operation.getTarget().getRelativePath() + "@" ++ return operation.getIOOperationType() + ":" + operation.getTarget().getRelativePath() + "@" + + OP_THREAD_TIMESTAMP.format(ZonedDateTime.now()); + } + @Override public void merge(ILSMIOOperation operation) throws HyracksDataException { - LOGGER.debug("Started a merge operation for index {}", lsmIndex); + List<ILSMComponent> mergingComponents = ((MergeOperation) operation).getMergingComponents(); + LOGGER.debug("Started MERGE operation {} of {} components for index: {}", operation, mergingComponents.size(), + lsmIndex); synchronized (opTracker) { - enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE); + enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE, false); } ILSMDiskComponent newComponent; try { @@@ -783,7 -792,7 +798,7 @@@ @Override public void deleteComponents(ILSMIndexOperationContext ctx, Predicate<ILSMComponent> predicate) throws HyracksDataException { -- ILSMIOOperation ioOperation = null; ++ ILSMIOOperation ioOperation; // We need to always start the component delete from current memory component. // This will ensure Primary and secondary component id still matches after component delete if (!lsmIndex.isMemoryComponentsAllocated()) {
