This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 5f18a7e3fe9745f9b73c1a6a46f152100ac15778 Merge: 2292810 66aaf41 Author: Michael Blow <[email protected]> AuthorDate: Wed Dec 11 11:40:07 2019 -0500 Merge branch 'gerrit/stabilization-f69489' into 'gerrit/mad-hatter' Change-Id: I17edb6b03563ac527bcad39f93193067ee42a33d .../join-ASTERIXDB-2686.1.ddl.sqlpp | 16 ++++++++++------ .../join-ASTERIXDB-2686.2.update.sqlpp | 11 +++++------ .../join-ASTERIXDB-2686.3.query.sqlpp | 10 ++++------ .../join-ASTERIXDB-2686.4.ddl.sqlpp | 9 +-------- .../misc/join-ASTERIXDB-2686/join-ASTERIXDB-2686.1.adm | 1 + .../src/test/resources/runtimets/testsuite_sqlpp.xml | 5 +++++ .../apache/hyracks/api/io/IWorkspaceFileFactory.java | 16 ++++++++++++++++ .../ExternalGroupWriteOperatorNodePushable.java | 3 +-- .../OptimizedHybridHashJoinOperatorDescriptor.java | 18 ++++++++++++++++++ .../std/sort/AbstractExternalSortRunMerger.java | 2 +- 10 files changed, 62 insertions(+), 29 deletions(-) diff --cc asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 24988bb,817bab6..cc5d18d --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@@ -5887,15 -3801,10 +5887,20 @@@ </compilation-unit> </test-case> <test-case FilePath="misc"> + <compilation-unit name="field_access_union-ASTERIXDB-2288"> + <output-dir compare="Text">field_access_union-ASTERIXDB-2288</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="misc"> + <compilation-unit name="constant_folding"> + <output-dir compare="Text">constant_folding</output-dir> + </compilation-unit> + </test-case> ++ <test-case FilePath="misc"> + <compilation-unit name="join-ASTERIXDB-2686"> + <output-dir compare="Text">join-ASTERIXDB-2686</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="misc"> <compilation-unit name="poll-dynamic"> <output-dir compare="Text">poll-dynamic</output-dir> diff --cc hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java index 08b2303,f3e9320..1beaab8 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java @@@ -72,75 -74,113 +72,75 @@@ public abstract class AbstractExternalS this.comparators = comparators; this.nmkComputer = nmkComputer; this.recordDesc = recordDesc; - this.framesLimit = framesLimit; - this.writer = writer; + this.maxMergeWidth = framesLimit - 1; this.topK = topK; + this.first = true; } - public void process() throws HyracksDataException { - IFrameWriter finalWriter = null; + @CriticalPath + public void process(IFrameWriter finalWriter) throws HyracksDataException { try { - if (runs.isEmpty()) { - finalWriter = prepareSkipMergingFinalResultWriter(writer); - finalWriter.open(); - if (sorter != null) { - try { - if (sorter.hasRemaining()) { - sorter.flush(finalWriter); - } - } finally { - sorter.close(); - } - } - } else { - /** recycle sort buffer */ - if (sorter != null) { - sorter.close(); - } - - finalWriter = prepareFinalMergeResultWriter(writer); - finalWriter.open(); - - int maxMergeWidth = framesLimit - 1; - - inFrames = new ArrayList<>(maxMergeWidth); - outputFrame = new VSizeFrame(ctx); - List<GeneratedRunFileReader> partialRuns = new ArrayList<>(maxMergeWidth); - - int stop = runs.size(); - currentGenerationRunAvailable.set(0, stop); - - while (true) { - - int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns, - currentGenerationRunAvailable, stop); - prepareFrames(unUsed, inFrames, partialRuns); - - if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) { - GeneratedRunFileReader reader; - if (partialRuns.size() == 1) { - if (!currentGenerationRunAvailable.isEmpty()) { - throw new HyracksDataException( - "The record is too big to put into the merging frame, please" - + " allocate more sorting memory"); - } else { - reader = partialRuns.get(0); - } - + createReusableObjects(); + int stop = runs.size(); + currentGenerationRunAvailable.set(0, stop); + int numberOfPasses = 1; + while (true) { + int unUsed = selectPartialRuns(maxMergeWidth * ctx.getInitialFrameSize(), runs, partialRuns, + currentGenerationRunAvailable, stop); + prepareFrames(unUsed, inFrames, partialRuns); + + if (!currentGenerationRunAvailable.isEmpty() || stop < runs.size()) { + GeneratedRunFileReader reader; + if (partialRuns.size() == 1) { + if (!currentGenerationRunAvailable.isEmpty()) { + throw new HyracksDataException("The record is too big to put into the merging frame, please" + + " allocate more sorting memory"); } else { - RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); - IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); - - try { - mergeResultWriter.open(); - merge(mergeResultWriter, partialRuns); - } catch (Throwable t) { - mergeResultWriter.fail(); - throw t; - } finally { - mergeResultWriter.close(); - } - reader = mergeFileWriter.createDeleteOnCloseReader(); + reader = partialRuns.get(0); } - runs.add(reader); - - if (currentGenerationRunAvailable.isEmpty()) { + } else { + RunFileWriter mergeFileWriter = prepareIntermediateMergeRunFile(); + IFrameWriter mergeResultWriter = prepareIntermediateMergeResultWriter(mergeFileWriter); + + try { + mergeResultWriter.open(); + merge(mergeResultWriter, partialRuns); + } catch (Throwable t) { + mergeResultWriter.fail(); + throw t; + } finally { + mergeResultWriter.close(); + } - reader = mergeFileWriter.createReader(); ++ reader = mergeFileWriter.createDeleteOnCloseReader(); + } + runs.add(reader); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("generated runs:" + stop); - } - runs.subList(0, stop).clear(); - currentGenerationRunAvailable.clear(); - currentGenerationRunAvailable.set(0, runs.size()); - stop = runs.size(); + if (currentGenerationRunAvailable.isEmpty()) { + numberOfPasses++; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("generated runs:" + stop); } - } else { + runs.subList(0, stop).clear(); + currentGenerationRunAvailable.clear(); + currentGenerationRunAvailable.set(0, runs.size()); + stop = runs.size(); + } + } else { + if (LOGGER.isDebugEnabled()) { LOGGER.debug("final runs: {}", stop); - merge(finalWriter, partialRuns); - break; + LOGGER.debug("number of passes: " + numberOfPasses); } + merge(finalWriter, partialRuns); + break; } } - } catch (Exception e) { - if (finalWriter != null) { - finalWriter.fail(); - } - throw HyracksDataException.create(e); } finally { - try { - if (finalWriter != null) { - finalWriter.close(); - } - } finally { - for (RunFileReader reader : runs) { - try { - reader.close(); // close is idempotent. - } catch (Exception e) { - if (LOGGER.isWarnEnabled()) { - LOGGER.log(Level.WARN, e.getMessage(), e); - } + for (RunFileReader reader : runs) { + try { + reader.close(); // close is idempotent. + } catch (Exception e) { + if (LOGGER.isWarnEnabled()) { + LOGGER.log(Level.WARN, e.getMessage(), e); } } }
