DrDub commented on code in PR #25:
URL: https://github.com/apache/uima-uimacpp/pull/25#discussion_r1726988677


##########
src/test/src/test_engine.cpp:
##########
@@ -510,7 +510,8 @@ void testCasMultiplier(uima::util::ConsoleUI & rclConsole)
     num++;
     CAS & seg = iter.next();
     failIfNotTrue(seg.getDocumentText().length() > 0);
-    pEngine->getAnnotatorContext().releaseCAS(seg);
+    // pEngine->getAnnotatorContext().releaseCAS(seg);

Review Comment:
   why keep this line commented out, just delete it.



##########
src/framework/annotator_mgr.cpp:
##########
@@ -583,7 +576,185 @@ namespace uima {
       }
 
       UIMA_ANNOTATOR_TIMING(iv_clTimerLaunchProcess.stop());
-      return(utRetVal);
+      return utRetVal;
+    }
+
+    CAS *AnnotatorManager::processUntilNextOutputCas() {
+      unique_ptr<Flow> flow{};
+      while (true) {
+        CAS *currentCas = nullptr;
+        Step nextStep;
+        flow = nullptr;
+
+        // get a cas from the stack
+        if (casIterStack.empty())
+          return nullptr;
+
+        StackFrame &frame = casIterStack.top();
+        try {
+          if (frame.casMultiplier && frame.casMultiplier->hasNext()) {
+            currentCas = &frame.casMultiplier->next();
+            // compute flow for newly produced CAS
+            flow = frame.originalFlow->newCasProduced(*currentCas, 
frame.lastEngineKey);
+          }
+        } catch (Exception &exception) {
+          if (!frame.originalFlow->continueOnFailure(frame.lastEngineKey /* 
,exception */))
+            throw;
+        }
+
+        if (!currentCas) {
+          // if there is no more output CASes from the stack, take the 
original CAS that was processed by
+          // the CAS Multiplier and continue with its flow
+          currentCas = frame.originalCas;
+          flow = std::move(frame.originalFlow);
+          currentCas->setCurrentComponentInfo(nullptr);
+          casIterStack.pop();
+        }
+
+        activeCASes.insert(currentCas);

Review Comment:
   I don't see where you remove CASes from `activeCASes`. That will lead to 
dangling pointers. The `release()` method will segfault.



##########
src/framework/annotator_mgr.cpp:
##########
@@ -583,7 +576,185 @@ namespace uima {
       }
 
       UIMA_ANNOTATOR_TIMING(iv_clTimerLaunchProcess.stop());
-      return(utRetVal);
+      return utRetVal;
+    }
+
+    CAS *AnnotatorManager::processUntilNextOutputCas() {
+      unique_ptr<Flow> flow{};
+      while (true) {
+        CAS *currentCas = nullptr;
+        Step nextStep;
+        flow = nullptr;
+
+        // get a cas from the stack
+        if (casIterStack.empty())
+          return nullptr;
+
+        StackFrame &frame = casIterStack.top();
+        try {
+          if (frame.casMultiplier && frame.casMultiplier->hasNext()) {
+            currentCas = &frame.casMultiplier->next();
+            // compute flow for newly produced CAS
+            flow = frame.originalFlow->newCasProduced(*currentCas, 
frame.lastEngineKey);
+          }
+        } catch (Exception &exception) {
+          if (!frame.originalFlow->continueOnFailure(frame.lastEngineKey /* 
,exception */))
+            throw;
+        }
+
+        if (!currentCas) {
+          // if there is no more output CASes from the stack, take the 
original CAS that was processed by
+          // the CAS Multiplier and continue with its flow
+          currentCas = frame.originalCas;
+          flow = std::move(frame.originalFlow);
+          currentCas->setCurrentComponentInfo(nullptr);
+          casIterStack.pop();
+        }
+
+        activeCASes.insert(currentCas);
+
+        if (nextStep.getType() == Step::StepType::UNSPECIFIED) {
+          nextStep = flow->next(); // get the next step for the current flow
+        }
+
+        while (nextStep.getType() != Step::StepType::FINALSTEP) {
+          if (nextStep.getType() == Step::StepType::SIMPLESTEP) {
+            // find the AE specified by the step
+            const icu::UnicodeString &nextAEKey = 
nextStep.getSimpleStep()->getEngineName();
+            auto it = std::find_if(iv_vecEntries.begin(), iv_vecEntries.end(),
+                                   [&, nextAEKey](const EngineEntry &entry) {
+                                     return 
entry.iv_pEngine->getAnnotatorContext().iv_AnCKey == nextAEKey;
+                                   });
+
+            if (it != iv_vecEntries.end()) {
+              AnalysisEngine *nextAE = it->iv_pEngine;
+              CAS *outputCas = nullptr;
+
+              // call process one the AE and see if it has produced a new CAS
+              try {
+                CASIterator casIter = 
nextAE->processAndOutputNewCASes(*currentCas);
+                if (casIter.hasNext())
+                  outputCas = &casIter.next();
+              } catch (Exception &e) {
+                if (!flow->continueOnFailure(nextAEKey))
+                  throw;
+              }
+
+              if (outputCas) {
+                // new CAS is output so put the current components on the 
stack so we can process
+                // the other output CASes and original CASes later
+                std::unique_ptr<Flow> nextFlow = 
flow->newCasProduced(*outputCas, nextAEKey);
+                casIterStack.push({nextAE, currentCas, std::move(flow), 
nextAEKey});
+                flow = std::move(nextFlow);
+                currentCas = outputCas;
+                activeCASes.insert(currentCas);
+              } else {
+                // No new CASes are output, this CAS is done being processed 
by the current engine.
+                currentCas->setCurrentComponentInfo(nullptr);
+              }
+            } else {
+              UIMA_EXC_THROW_NEW(EngineProcessingException,
+                                 UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                                 UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                                 ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Unknown Delegate Key"),
+                                 ErrorInfo::unrecoverable);
+            }
+          } else if (nextStep.getType() == Step::StepType::PARALLELSTEP) {
+            // TODO: ParallelStep not supported yet
+            UIMA_EXC_THROW_NEW(NotYetImplementedException,
+                               UIMA_ERR_NOT_YET_IMPLEMENTED,
+                               UIMA_MSG_ID_EXC_NOT_YET_IMPLEMENTED,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Parallel Step not supported yet"),
+                               ErrorInfo::unrecoverable
+            );
+          } else {
+            UIMA_EXC_THROW_NEW(EngineProcessingException,
+                               UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                               UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Unknown Step Type"),
+                               ErrorInfo::unrecoverable);
+          }
+
+          nextStep = flow->next();
+        }
+
+        const FinalStep *finalStep = nextStep.getFinalStep();
+        if (currentCas == inputCas) {
+          if (finalStep->getForceDropCAS()) {
+            // Not allowed to drop the input CAS so something must have gone 
wrong
+            UIMA_EXC_THROW_NEW(EngineProcessingException,
+                               UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                               UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Illegal CAS drop"),
+                               ErrorInfo::unrecoverable);
+          }
+          return nullptr;
+        }
+
+        if (iv_bOutputNewCases && !finalStep->getForceDropCAS())

Review Comment:
   Just to double check I'm understanding the code correctly, this is where the 
`DROP_IF_NEW_CAS_PRODUCED` functionality is implemented?



##########
src/framework/annotator_mgr.cpp:
##########
@@ -583,7 +576,185 @@ namespace uima {
       }
 
       UIMA_ANNOTATOR_TIMING(iv_clTimerLaunchProcess.stop());
-      return(utRetVal);
+      return utRetVal;
+    }
+
+    CAS *AnnotatorManager::processUntilNextOutputCas() {
+      unique_ptr<Flow> flow{};
+      while (true) {
+        CAS *currentCas = nullptr;
+        Step nextStep;
+        flow = nullptr;
+
+        // get a cas from the stack
+        if (casIterStack.empty())
+          return nullptr;
+
+        StackFrame &frame = casIterStack.top();
+        try {
+          if (frame.casMultiplier && frame.casMultiplier->hasNext()) {
+            currentCas = &frame.casMultiplier->next();
+            // compute flow for newly produced CAS
+            flow = frame.originalFlow->newCasProduced(*currentCas, 
frame.lastEngineKey);
+          }
+        } catch (Exception &exception) {
+          if (!frame.originalFlow->continueOnFailure(frame.lastEngineKey /* 
,exception */))
+            throw;
+        }
+
+        if (!currentCas) {
+          // if there is no more output CASes from the stack, take the 
original CAS that was processed by
+          // the CAS Multiplier and continue with its flow
+          currentCas = frame.originalCas;
+          flow = std::move(frame.originalFlow);
+          currentCas->setCurrentComponentInfo(nullptr);
+          casIterStack.pop();
+        }
+
+        activeCASes.insert(currentCas);
+
+        if (nextStep.getType() == Step::StepType::UNSPECIFIED) {
+          nextStep = flow->next(); // get the next step for the current flow
+        }
+
+        while (nextStep.getType() != Step::StepType::FINALSTEP) {
+          if (nextStep.getType() == Step::StepType::SIMPLESTEP) {
+            // find the AE specified by the step
+            const icu::UnicodeString &nextAEKey = 
nextStep.getSimpleStep()->getEngineName();
+            auto it = std::find_if(iv_vecEntries.begin(), iv_vecEntries.end(),
+                                   [&, nextAEKey](const EngineEntry &entry) {
+                                     return 
entry.iv_pEngine->getAnnotatorContext().iv_AnCKey == nextAEKey;
+                                   });
+
+            if (it != iv_vecEntries.end()) {
+              AnalysisEngine *nextAE = it->iv_pEngine;
+              CAS *outputCas = nullptr;
+
+              // call process one the AE and see if it has produced a new CAS
+              try {
+                CASIterator casIter = 
nextAE->processAndOutputNewCASes(*currentCas);
+                if (casIter.hasNext())
+                  outputCas = &casIter.next();
+              } catch (Exception &e) {
+                if (!flow->continueOnFailure(nextAEKey))
+                  throw;
+              }
+
+              if (outputCas) {
+                // new CAS is output so put the current components on the 
stack so we can process
+                // the other output CASes and original CASes later
+                std::unique_ptr<Flow> nextFlow = 
flow->newCasProduced(*outputCas, nextAEKey);
+                casIterStack.push({nextAE, currentCas, std::move(flow), 
nextAEKey});
+                flow = std::move(nextFlow);
+                currentCas = outputCas;
+                activeCASes.insert(currentCas);
+              } else {
+                // No new CASes are output, this CAS is done being processed 
by the current engine.
+                currentCas->setCurrentComponentInfo(nullptr);
+              }
+            } else {
+              UIMA_EXC_THROW_NEW(EngineProcessingException,
+                                 UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                                 UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                                 ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Unknown Delegate Key"),

Review Comment:
   any chances of including the key itself in the error message?



##########
src/test/data/descriptors/SegmentAnnotateMerge.xml:
##########
@@ -77,12 +77,12 @@
                     </array>
                 </value>
             </nameValuePair>
-            <nameValuePair>

Review Comment:
   this seems like something didn't need to be committed.



##########
src/framework/annotator_mgr.cpp:
##########
@@ -583,7 +576,185 @@ namespace uima {
       }
 
       UIMA_ANNOTATOR_TIMING(iv_clTimerLaunchProcess.stop());
-      return(utRetVal);
+      return utRetVal;
+    }
+
+    CAS *AnnotatorManager::processUntilNextOutputCas() {
+      unique_ptr<Flow> flow{};
+      while (true) {
+        CAS *currentCas = nullptr;
+        Step nextStep;
+        flow = nullptr;
+
+        // get a cas from the stack
+        if (casIterStack.empty())
+          return nullptr;
+
+        StackFrame &frame = casIterStack.top();
+        try {
+          if (frame.casMultiplier && frame.casMultiplier->hasNext()) {
+            currentCas = &frame.casMultiplier->next();
+            // compute flow for newly produced CAS
+            flow = frame.originalFlow->newCasProduced(*currentCas, 
frame.lastEngineKey);
+          }
+        } catch (Exception &exception) {
+          if (!frame.originalFlow->continueOnFailure(frame.lastEngineKey /* 
,exception */))
+            throw;
+        }
+
+        if (!currentCas) {
+          // if there is no more output CASes from the stack, take the 
original CAS that was processed by
+          // the CAS Multiplier and continue with its flow
+          currentCas = frame.originalCas;
+          flow = std::move(frame.originalFlow);
+          currentCas->setCurrentComponentInfo(nullptr);
+          casIterStack.pop();
+        }
+
+        activeCASes.insert(currentCas);
+
+        if (nextStep.getType() == Step::StepType::UNSPECIFIED) {
+          nextStep = flow->next(); // get the next step for the current flow
+        }
+
+        while (nextStep.getType() != Step::StepType::FINALSTEP) {
+          if (nextStep.getType() == Step::StepType::SIMPLESTEP) {
+            // find the AE specified by the step
+            const icu::UnicodeString &nextAEKey = 
nextStep.getSimpleStep()->getEngineName();
+            auto it = std::find_if(iv_vecEntries.begin(), iv_vecEntries.end(),
+                                   [&, nextAEKey](const EngineEntry &entry) {
+                                     return 
entry.iv_pEngine->getAnnotatorContext().iv_AnCKey == nextAEKey;
+                                   });
+
+            if (it != iv_vecEntries.end()) {
+              AnalysisEngine *nextAE = it->iv_pEngine;
+              CAS *outputCas = nullptr;
+
+              // call process one the AE and see if it has produced a new CAS
+              try {
+                CASIterator casIter = 
nextAE->processAndOutputNewCASes(*currentCas);
+                if (casIter.hasNext())
+                  outputCas = &casIter.next();
+              } catch (Exception &e) {
+                if (!flow->continueOnFailure(nextAEKey))
+                  throw;
+              }
+
+              if (outputCas) {
+                // new CAS is output so put the current components on the 
stack so we can process
+                // the other output CASes and original CASes later
+                std::unique_ptr<Flow> nextFlow = 
flow->newCasProduced(*outputCas, nextAEKey);
+                casIterStack.push({nextAE, currentCas, std::move(flow), 
nextAEKey});
+                flow = std::move(nextFlow);
+                currentCas = outputCas;
+                activeCASes.insert(currentCas);
+              } else {
+                // No new CASes are output, this CAS is done being processed 
by the current engine.
+                currentCas->setCurrentComponentInfo(nullptr);
+              }
+            } else {
+              UIMA_EXC_THROW_NEW(EngineProcessingException,
+                                 UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                                 UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                                 ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Unknown Delegate Key"),
+                                 ErrorInfo::unrecoverable);
+            }
+          } else if (nextStep.getType() == Step::StepType::PARALLELSTEP) {
+            // TODO: ParallelStep not supported yet
+            UIMA_EXC_THROW_NEW(NotYetImplementedException,
+                               UIMA_ERR_NOT_YET_IMPLEMENTED,
+                               UIMA_MSG_ID_EXC_NOT_YET_IMPLEMENTED,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Parallel Step not supported yet"),
+                               ErrorInfo::unrecoverable
+            );
+          } else {
+            UIMA_EXC_THROW_NEW(EngineProcessingException,
+                               UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                               UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Unknown Step Type"),
+                               ErrorInfo::unrecoverable);
+          }
+
+          nextStep = flow->next();
+        }
+
+        const FinalStep *finalStep = nextStep.getFinalStep();
+        if (currentCas == inputCas) {
+          if (finalStep->getForceDropCAS()) {
+            // Not allowed to drop the input CAS so something must have gone 
wrong
+            UIMA_EXC_THROW_NEW(EngineProcessingException,
+                               UIMA_ERR_USER_ANNOTATOR_COULD_NOT_PROCESS,
+                               UIMA_MSG_ID_EXCON_PROCESSING_CAS,
+                               ErrorMessage(UIMA_MSG_ID_LITERAL_STRING, 
"Illegal CAS drop"),
+                               ErrorInfo::unrecoverable);
+          }
+          return nullptr;
+        }
+
+        if (iv_bOutputNewCases && !finalStep->getForceDropCAS())
+          return currentCas;
+        currentCas->release();
+      }
+    }
+
+    bool AnnotatorManager::hasNext() {
+      if (!nextCas)
+        nextCas = processUntilNextOutputCas();
+      return nextCas != nullptr;
+    }
+
+    CAS & AnnotatorManager::next() {
+      CAS* result = nextCas;
+      if (!result)
+        result = processUntilNextOutputCas();
+      if (!result) {
+        UIMA_EXC_THROW_NEW(Exception,

Review Comment:
   shouldn't this call also `release()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@uima.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to