This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 5f18a7e3fe9745f9b73c1a6a46f152100ac15778
Merge: 2292810 66aaf41
Author: Michael Blow <[email protected]>
AuthorDate: Wed Dec 11 11:40:07 2019 -0500

    Merge branch 'gerrit/stabilization-f69489' into 'gerrit/mad-hatter'
    
    Change-Id: I17edb6b03563ac527bcad39f93193067ee42a33d

 .../join-ASTERIXDB-2686.1.ddl.sqlpp                    | 16 ++++++++++------
 .../join-ASTERIXDB-2686.2.update.sqlpp                 | 11 +++++------
 .../join-ASTERIXDB-2686.3.query.sqlpp                  | 10 ++++------
 .../join-ASTERIXDB-2686.4.ddl.sqlpp                    |  9 +--------
 .../misc/join-ASTERIXDB-2686/join-ASTERIXDB-2686.1.adm |  1 +
 .../src/test/resources/runtimets/testsuite_sqlpp.xml   |  5 +++++
 .../apache/hyracks/api/io/IWorkspaceFileFactory.java   | 16 ++++++++++++++++
 .../ExternalGroupWriteOperatorNodePushable.java        |  3 +--
 .../OptimizedHybridHashJoinOperatorDescriptor.java     | 18 ++++++++++++++++++
 .../std/sort/AbstractExternalSortRunMerger.java        |  2 +-
 10 files changed, 62 insertions(+), 29 deletions(-)

diff --cc asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 24988bb,817bab6..cc5d18d
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@@ -5887,15 -3801,10 +5887,20 @@@
        </compilation-unit>
      </test-case>
      <test-case FilePath="misc">
 +      <compilation-unit name="field_access_union-ASTERIXDB-2288">
 +        <output-dir 
compare="Text">field_access_union-ASTERIXDB-2288</output-dir>
 +      </compilation-unit>
 +    </test-case>
 +    <test-case FilePath="misc">
 +      <compilation-unit name="constant_folding">
 +        <output-dir compare="Text">constant_folding</output-dir>
 +      </compilation-unit>
 +    </test-case>
++    <test-case FilePath="misc">
+       <compilation-unit name="join-ASTERIXDB-2686">
+         <output-dir compare="Text">join-ASTERIXDB-2686</output-dir>
+       </compilation-unit>
+     </test-case>
      <test-case FilePath="misc">
        <compilation-unit name="poll-dynamic">
          <output-dir compare="Text">poll-dynamic</output-dir>
diff --cc 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index 08b2303,f3e9320..1beaab8
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@@ -72,75 -74,113 +72,75 @@@ public abstract class AbstractExternalS
          this.comparators = comparators;
          this.nmkComputer = nmkComputer;
          this.recordDesc = recordDesc;
 -        this.framesLimit = framesLimit;
 -        this.writer = writer;
 +        this.maxMergeWidth = framesLimit - 1;
          this.topK = topK;
 +        this.first = true;
      }
  
 -    public void process() throws HyracksDataException {
 -        IFrameWriter finalWriter = null;
 +    @CriticalPath
 +    public void process(IFrameWriter finalWriter) throws HyracksDataException 
{
          try {
 -            if (runs.isEmpty()) {
 -                finalWriter = prepareSkipMergingFinalResultWriter(writer);
 -                finalWriter.open();
 -                if (sorter != null) {
 -                    try {
 -                        if (sorter.hasRemaining()) {
 -                            sorter.flush(finalWriter);
 -                        }
 -                    } finally {
 -                        sorter.close();
 -                    }
 -                }
 -            } else {
 -                /** recycle sort buffer */
 -                if (sorter != null) {
 -                    sorter.close();
 -                }
 -
 -                finalWriter = prepareFinalMergeResultWriter(writer);
 -                finalWriter.open();
 -
 -                int maxMergeWidth = framesLimit - 1;
 -
 -                inFrames = new ArrayList<>(maxMergeWidth);
 -                outputFrame = new VSizeFrame(ctx);
 -                List<GeneratedRunFileReader> partialRuns = new 
ArrayList<>(maxMergeWidth);
 -
 -                int stop = runs.size();
 -                currentGenerationRunAvailable.set(0, stop);
 -
 -                while (true) {
 -
 -                    int unUsed = selectPartialRuns(maxMergeWidth * 
ctx.getInitialFrameSize(), runs, partialRuns,
 -                            currentGenerationRunAvailable, stop);
 -                    prepareFrames(unUsed, inFrames, partialRuns);
 -
 -                    if (!currentGenerationRunAvailable.isEmpty() || stop < 
runs.size()) {
 -                        GeneratedRunFileReader reader;
 -                        if (partialRuns.size() == 1) {
 -                            if (!currentGenerationRunAvailable.isEmpty()) {
 -                                throw new HyracksDataException(
 -                                        "The record is too big to put into 
the merging frame, please"
 -                                                + " allocate more sorting 
memory");
 -                            } else {
 -                                reader = partialRuns.get(0);
 -                            }
 -
 +            createReusableObjects();
 +            int stop = runs.size();
 +            currentGenerationRunAvailable.set(0, stop);
 +            int numberOfPasses = 1;
 +            while (true) {
 +                int unUsed = selectPartialRuns(maxMergeWidth * 
ctx.getInitialFrameSize(), runs, partialRuns,
 +                        currentGenerationRunAvailable, stop);
 +                prepareFrames(unUsed, inFrames, partialRuns);
 +
 +                if (!currentGenerationRunAvailable.isEmpty() || stop < 
runs.size()) {
 +                    GeneratedRunFileReader reader;
 +                    if (partialRuns.size() == 1) {
 +                        if (!currentGenerationRunAvailable.isEmpty()) {
 +                            throw new HyracksDataException("The record is too 
big to put into the merging frame, please"
 +                                    + " allocate more sorting memory");
                          } else {
 -                            RunFileWriter mergeFileWriter = 
prepareIntermediateMergeRunFile();
 -                            IFrameWriter mergeResultWriter = 
prepareIntermediateMergeResultWriter(mergeFileWriter);
 -
 -                            try {
 -                                mergeResultWriter.open();
 -                                merge(mergeResultWriter, partialRuns);
 -                            } catch (Throwable t) {
 -                                mergeResultWriter.fail();
 -                                throw t;
 -                            } finally {
 -                                mergeResultWriter.close();
 -                            }
 -                            reader = 
mergeFileWriter.createDeleteOnCloseReader();
 +                            reader = partialRuns.get(0);
                          }
 -                        runs.add(reader);
 -
 -                        if (currentGenerationRunAvailable.isEmpty()) {
 +                    } else {
 +                        RunFileWriter mergeFileWriter = 
prepareIntermediateMergeRunFile();
 +                        IFrameWriter mergeResultWriter = 
prepareIntermediateMergeResultWriter(mergeFileWriter);
 +
 +                        try {
 +                            mergeResultWriter.open();
 +                            merge(mergeResultWriter, partialRuns);
 +                        } catch (Throwable t) {
 +                            mergeResultWriter.fail();
 +                            throw t;
 +                        } finally {
 +                            mergeResultWriter.close();
 +                        }
-                         reader = mergeFileWriter.createReader();
++                        reader = mergeFileWriter.createDeleteOnCloseReader();
 +                    }
 +                    runs.add(reader);
  
 -                            if (LOGGER.isDebugEnabled()) {
 -                                LOGGER.debug("generated runs:" + stop);
 -                            }
 -                            runs.subList(0, stop).clear();
 -                            currentGenerationRunAvailable.clear();
 -                            currentGenerationRunAvailable.set(0, runs.size());
 -                            stop = runs.size();
 +                    if (currentGenerationRunAvailable.isEmpty()) {
 +                        numberOfPasses++;
 +                        if (LOGGER.isDebugEnabled()) {
 +                            LOGGER.debug("generated runs:" + stop);
                          }
 -                    } else {
 +                        runs.subList(0, stop).clear();
 +                        currentGenerationRunAvailable.clear();
 +                        currentGenerationRunAvailable.set(0, runs.size());
 +                        stop = runs.size();
 +                    }
 +                } else {
 +                    if (LOGGER.isDebugEnabled()) {
                          LOGGER.debug("final runs: {}", stop);
 -                        merge(finalWriter, partialRuns);
 -                        break;
 +                        LOGGER.debug("number of passes: " + numberOfPasses);
                      }
 +                    merge(finalWriter, partialRuns);
 +                    break;
                  }
              }
 -        } catch (Exception e) {
 -            if (finalWriter != null) {
 -                finalWriter.fail();
 -            }
 -            throw HyracksDataException.create(e);
          } finally {
 -            try {
 -                if (finalWriter != null) {
 -                    finalWriter.close();
 -                }
 -            } finally {
 -                for (RunFileReader reader : runs) {
 -                    try {
 -                        reader.close(); // close is idempotent.
 -                    } catch (Exception e) {
 -                        if (LOGGER.isWarnEnabled()) {
 -                            LOGGER.log(Level.WARN, e.getMessage(), e);
 -                        }
 +            for (RunFileReader reader : runs) {
 +                try {
 +                    reader.close(); // close is idempotent.
 +                } catch (Exception e) {
 +                    if (LOGGER.isWarnEnabled()) {
 +                        LOGGER.log(Level.WARN, e.getMessage(), e);
                      }
                  }
              }

Reply via email to