BATCHEE-88 improve handling in case of error during step commit
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/7a74df4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/7a74df4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/7a74df4b Branch: refs/heads/master Commit: 7a74df4bc9295377ab35ea53eacb8204698cd425 Parents: 4313c77 Author: Mark Struberg <[email protected]> Authored: Sun May 15 21:57:00 2016 +0200 Committer: Mark Struberg <[email protected]> Committed: Sun May 15 21:57:00 2016 +0200 ---------------------------------------------------------------------- .../batchee/its/transaction/TxErrorTest.java | 29 ++++++++++++++-- .../resources/META-INF/batch-jobs/txtest1.xml | 3 -- .../impl/controller/BaseStepController.java | 17 ++++++++-- .../controller/chunk/CheckpointManager.java | 28 +++++++++++++--- .../controller/chunk/ChunkStepController.java | 35 +++++++++++++++----- 5 files changed, 92 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java ---------------------------------------------------------------------- diff --git a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java index acdbd51..564c7a7 100644 --- a/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java +++ b/integration-tests/transaction/src/test/java/org/apache/batchee/its/transaction/TxErrorTest.java @@ -19,6 +19,10 @@ package org.apache.batchee.its.transaction; import javax.batch.operations.JobOperator; import javax.batch.runtime.BatchRuntime; import javax.batch.runtime.BatchStatus; +import javax.batch.runtime.Metric; +import javax.batch.runtime.StepExecution; + +import java.util.List; import org.apache.batchee.util.Batches; import org.testng.Assert; @@ -30,8 +34,27 @@ public class TxErrorTest { @Test public void testRolledBackDuringWork() { final JobOperator jobOperator = BatchRuntime.getJobOperator(); - BatchStatus batchStatus = Batches.waitFor(jobOperator, jobOperator.start("txtest1", null)); - Assert.assertEquals(batchStatus, BatchStatus.COMPLETED); - Assert.assertEquals(TxErrorWriter1.written.intValue(), 5); + long executionId = jobOperator.start("txtest1", null); + BatchStatus batchStatus = Batches.waitFor(jobOperator, executionId); + Assert.assertEquals(batchStatus, BatchStatus.FAILED); + Assert.assertEquals(TxErrorWriter1.written.intValue(), 3); + + List<StepExecution> stepExecutions = jobOperator.getStepExecutions(executionId); + Assert.assertEquals(stepExecutions.size(), 1); + StepExecution stepExecution = stepExecutions.get(0); + Metric[] metrics = stepExecution.getMetrics(); + assertMetric(Metric.MetricType.READ_COUNT, 2, metrics); + assertMetric(Metric.MetricType.WRITE_COUNT, 2, metrics); + assertMetric(Metric.MetricType.ROLLBACK_COUNT, 1, metrics); + } + + private void assertMetric(Metric.MetricType metricType, long expected, Metric[] metrics) { + for (Metric metric : metrics) { + if (metricType.equals(metric.getType())) { + Assert.assertEquals(metric.getValue(), expected); + return; + } + } + Assert.fail("MetricType " + metricType + " not in collected metrics"); } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml ---------------------------------------------------------------------- diff --git a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml index 4b7a67c..ba6809f 100644 --- a/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml +++ b/integration-tests/transaction/src/test/resources/META-INF/batch-jobs/txtest1.xml @@ -18,9 +18,6 @@ <chunk item-count="1"> <reader ref="org.apache.batchee.its.transaction.TxErrorReader"></reader> <writer ref="org.apache.batchee.its.transaction.TxErrorWriter1"></writer> - <skippable-exception-classes> - <include class="java.lang.Exception"/> <!-- all exceptions... --> - </skippable-exception-classes> </chunk> </step> </job> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java index d07d858..63381aa 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/BaseStepController.java @@ -359,6 +359,11 @@ public abstract class BaseStepController implements ExecutionElementController { } protected void persistUserData() { + PersistentDataWrapper userData = resolveUserData(); + storeUserData(userData); + } + + protected PersistentDataWrapper resolveUserData() { final ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream(); final ObjectOutputStream persistentDataOOS; @@ -370,8 +375,16 @@ public abstract class BaseStepController implements ExecutionElementController { throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e); } - stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray())); - statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus); + return new PersistentDataWrapper(persistentBAOS.toByteArray()); + } + + protected void storeUserData(PersistentDataWrapper userData) { + try { + stepStatus.setPersistentUserData(userData); + statusManagerService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus); + } catch (final Exception e) { + throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e); + } } protected void persistExitStatusAndEndTimestamp() { http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java index 7e07450..584d737 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java @@ -16,6 +16,9 @@ */ package org.apache.batchee.container.impl.controller.chunk; +import java.util.HashMap; +import java.util.Map; + import org.apache.batchee.container.exception.BatchContainerRuntimeException; import org.apache.batchee.container.exception.BatchContainerServiceException; import org.apache.batchee.spi.DataRepresentationService; @@ -75,28 +78,45 @@ public class CheckpointManager { } } - public void checkpoint() { + /** + * Takes the current checkpoint data from the ItemReader and ItemWriter + * and store them in the database + */ + public Map<CheckpointDataKey, CheckpointData> prepareCheckpoints() { final CheckpointDataKey readerChkptDK; final CheckpointDataKey writerChkptDK; + Map<CheckpointDataKey, CheckpointData> checkpoints = new HashMap<CheckpointDataKey, CheckpointData>(2); try { byte[] checkpointBytes = dataRepresentationService.toInternalRepresentation(readerProxy.checkpointInfo()); CheckpointData readerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.READER); readerChkptData.setRestartToken(checkpointBytes); readerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.READER); - persistenceManagerService.setCheckpointData(readerChkptDK, readerChkptData); + checkpoints.put(readerChkptDK, readerChkptData); checkpointBytes = dataRepresentationService.toInternalRepresentation(writerProxy.checkpointInfo()); CheckpointData writerChkptData = new CheckpointData(jobInstanceID, stepId, CheckpointType.WRITER); writerChkptData.setRestartToken(checkpointBytes); writerChkptDK = new CheckpointDataKey(jobInstanceID, stepId, CheckpointType.WRITER); - persistenceManagerService.setCheckpointData(writerChkptDK, writerChkptData); - + checkpoints.put(writerChkptDK, writerChkptData); } catch (final Exception ex) { // is this what I should be throwing here? throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex); } + + return checkpoints; + } + + public void storeCheckPoints(Map<CheckpointDataKey, CheckpointData> checkpoints) { + try { + for (Map.Entry<CheckpointDataKey, CheckpointData> checkpointEntry : checkpoints.entrySet()) { + persistenceManagerService.setCheckpointData(checkpointEntry.getKey(), checkpointEntry.getValue()); + } + } catch (final Exception ex) { + throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + stepId + "]", ex); + } + } public int checkpointTimeout() { http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/7a74df4b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java index ba256fa..6d72ccd 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java @@ -48,10 +48,14 @@ import javax.batch.api.chunk.listener.RetryWriteListener; import javax.batch.api.chunk.listener.SkipProcessListener; import javax.batch.api.chunk.listener.SkipReadListener; import javax.batch.api.chunk.listener.SkipWriteListener; +import javax.batch.operations.BatchRuntimeException; import javax.batch.runtime.BatchStatus; +import javax.transaction.Status; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.logging.Level; @@ -585,11 +589,22 @@ public class ChunkStepController extends SingleThreadedStepController { chunkProxy.afterChunk(); } - checkpointManager.checkpoint(); - - this.persistUserData(); - - transactionManager.commit(); + Map<CheckpointDataKey, CheckpointData> checkpoints = checkpointManager.prepareCheckpoints(); + PersistentDataWrapper userData = resolveUserData(); + try { + transactionManager.commit(); + storeUserData(userData); + checkpointManager.storeCheckPoints(checkpoints); + } catch (Exception e) { + // only set the Exception if we didn't blow up before anyway + if (this.stepContext.getException() != null) { + this.stepContext.setException(e); + } + if (e instanceof BatchRuntimeException) { + throw e; + } + throw new BatchContainerServiceException("Cannot commit the transaction for the step.", e); + } checkpointManager.endCheckpoint(); @@ -659,14 +674,15 @@ public class ChunkStepController extends SingleThreadedStepController { */ private void rollback(final Throwable t) { try { - // ignore, we blow up anyway - transactionManager.setRollbackOnly(); try { doClose(); } catch (Exception e) { // ignore, we blow up anyway } + // ignore, we blow up anyway + transactionManager.setRollbackOnly(); + if (t instanceof Exception) { Exception e = (Exception) t; for (ChunkListener chunkProxy : chunkListeners) { @@ -681,7 +697,10 @@ public class ChunkStepController extends SingleThreadedStepController { // ever come up in the spec, but seems marginally more useful. stepContext.getMetric(MetricImpl.MetricType.ROLLBACK_COUNT).incValue(); } finally { - transactionManager.rollback(); + int txStatus = transactionManager.getStatus(); + if (txStatus == Status.STATUS_ACTIVE || txStatus == Status.STATUS_MARKED_ROLLBACK) { + transactionManager.rollback(); + } throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t); } }
