BATCHEE-112 persistence service doesnt support multiple execution for getRunningExecution case of JobOperator when set to memory (default)
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/a0ca7ae8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/a0ca7ae8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/a0ca7ae8 Branch: refs/heads/master Commit: a0ca7ae832547f5cc6b67c83a53e0bcbfb044f9d Parents: a69bd38 Author: Romain manni-Bucau <[email protected]> Authored: Wed Sep 7 09:10:17 2016 +0200 Committer: Romain manni-Bucau <[email protected]> Committed: Wed Sep 7 09:10:17 2016 +0200 ---------------------------------------------------------------------- .../batchee/container/impl/JobOperatorImpl.java | 2 +- .../SimpleJobExecutionCallbackService.java | 2 +- .../MemoryPersistenceManagerService.java | 27 ++++------- .../batchee/component/SimpleBatchlet.java | 37 +++++++++++++++ .../container/impl/JobOperatorImplTest.java | 49 ++++++++++++++++++++ .../resources/META-INF/batch-jobs/simple.xml | 14 ++++++ 6 files changed, 112 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java index d1f5a3a..27082f7 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java @@ -255,7 +255,7 @@ public class JobOperatorImpl implements JobOperator { // get the jobexecution ids associated with this job name final Set<Long> executionIds = persistenceManagerService.jobOperatorGetRunningExecutions(jobName); - if (executionIds.size() <= 0) { + if (executionIds.isEmpty()) { throw new NoSuchJobException("Job Name " + jobName + " not found"); } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java index 1416894..faf0f60 100644 --- a/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java +++ b/jbatch/src/main/java/org/apache/batchee/container/services/callback/SimpleJobExecutionCallbackService.java @@ -57,7 +57,7 @@ public class SimpleJobExecutionCallbackService implements JobExecutionCallbackSe // check before blocking final InternalJobExecution finalCheckExec = ServicesManager.find().service(BatchKernelService.class).getJobExecution(id); - if (Batches.isDone(finalCheckExec.getBatchStatus())) { + if (finalCheckExec != null && Batches.isDone(finalCheckExec.getBatchStatus())) { waiters.remove(id); return; } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java index ff988f4..7ca4aef 100644 --- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java +++ b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/MemoryPersistenceManagerService.java @@ -232,7 +232,7 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) { final List<InternalJobExecution> list = new LinkedList<InternalJobExecution>(); final Structures.JobInstanceData jobInstanceData = data.jobInstanceData.get(jobInstanceId); - if (jobInstanceData == null || jobInstanceData.executions == null) { + if (jobInstanceData == null) { return list; } synchronized (jobInstanceData.executions) { @@ -245,27 +245,20 @@ public class MemoryPersistenceManagerService implements PersistenceManagerServic @Override public Set<Long> jobOperatorGetRunningExecutions(final String jobName) { - Structures.JobInstanceData jobInstanceData = null; - for (final Structures.JobInstanceData instanceData : data.jobInstanceData.values()) { - if (instanceData.instance.getJobName().equals(jobName)) { - jobInstanceData = instanceData; - break; - } - } - - if (jobInstanceData == null) { - return Collections.emptySet(); - } - final Set<Long> set = new HashSet<Long>(); - synchronized (jobInstanceData.executions) { - for (final Structures.ExecutionInstanceData executionInstanceData : jobInstanceData.executions) { - if (RUNNING_STATUSES.contains(executionInstanceData.execution.getBatchStatus())) { - set.add(executionInstanceData.execution.getExecutionId()); + for (final Structures.JobInstanceData instanceData : data.jobInstanceData.values()) { + if (instanceData.instance.getJobName().equals(jobName)) { + synchronized (instanceData.executions) { + for (final Structures.ExecutionInstanceData executionInstanceData : instanceData.executions) { + if (RUNNING_STATUSES.contains(executionInstanceData.execution.getBatchStatus())) { + set.add(executionInstanceData.execution.getExecutionId()); + } + } } } } + return set; } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java ---------------------------------------------------------------------- diff --git a/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java b/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java new file mode 100644 index 0000000..3127063 --- /dev/null +++ b/jbatch/src/test/java/org/apache/batchee/component/SimpleBatchlet.java @@ -0,0 +1,37 @@ +/* + * Copyright 2012 International Business Machines Corp. + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.component; + +import javax.batch.api.AbstractBatchlet; +import javax.batch.api.BatchProperty; +import javax.inject.Inject; + +import static java.lang.Thread.sleep; + +public class SimpleBatchlet extends AbstractBatchlet { + @Inject + @BatchProperty + private Long duration; + + @Override + public String process() throws Exception { + if (duration != null && duration > 0) { + sleep(duration); + } + return "ok"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java ---------------------------------------------------------------------- diff --git a/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java b/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java new file mode 100644 index 0000000..4042cf5 --- /dev/null +++ b/jbatch/src/test/java/org/apache/batchee/container/impl/JobOperatorImplTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2012 International Business Machines Corp. + * + * See the NOTICE file distributed with this work for additional information + * regarding copyright ownership. Licensed under the Apache License, + * Version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.container.impl; + +import org.apache.batchee.container.services.ServicesManager; +import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService; +import org.apache.batchee.spi.PersistenceManagerService; +import org.junit.Test; + +import javax.batch.operations.JobOperator; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.singletonList; +import static org.apache.batchee.util.Batches.waitForEnd; +import static org.junit.Assert.assertEquals; + +public class JobOperatorImplTest { + @Test + public void runningExecutionMemory_BATCHEE112() { + final JobOperator operator = new JobOperatorImpl(new ServicesManager() {{ + init(new Properties() {{ + setProperty(PersistenceManagerService.class.getSimpleName(), MemoryPersistenceManagerService.class.getName()); + }}); + }}); + for (int i = 0; i < 10; i++) { + final long id = operator.start("simple", new Properties() {{ + setProperty("duration", "3000"); + }}); + final List<Long> running = operator.getRunningExecutions("simple"); + assertEquals("Iteration: " + i, singletonList(id), running); + waitForEnd(operator, id); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/a0ca7ae8/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml ---------------------------------------------------------------------- diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml b/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml new file mode 100644 index 0000000..9b73787 --- /dev/null +++ b/jbatch/src/test/resources/META-INF/batch-jobs/simple.xml @@ -0,0 +1,14 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- See the NOTICE file distributed with this work for additional information + regarding copyright ownership. Licensed under the Apache License, Version + 2.0 (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> +<job id="simple" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee"> + <step id="exec"> + <batchlet ref="org.apache.batchee.component.SimpleBatchlet" /> + </step> +</job>
