Repository: incubator-batchee Updated Branches: refs/heads/master ce60c30e8 -> 1a6634f10
BATCHEE-113 added test for improved scope handling Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/1a6634f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/1a6634f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/1a6634f1 Branch: refs/heads/master Commit: 1a6634f10952f2aee626741c327410878f6ffbe5 Parents: ce60c30 Author: Reinhard Sandtner <[email protected]> Authored: Tue Dec 20 21:37:37 2016 +0100 Committer: Reinhard Sandtner <[email protected]> Committed: Tue Dec 20 23:56:00 2016 +0100 ---------------------------------------------------------------------- .../org/apache/batchee/cdi/BatchScopesTest.java | 41 +++++++++- .../batchee/cdi/component/JobScopedBean.java | 4 + .../partitioned/PartitionedJobScopedReader.java | 86 ++++++++++++++++++++ .../partitioned/PartitionedJobScopedWriter.java | 46 +++++++++++ .../batch-jobs/partitioned-job-scoped.xml | 51 ++++++++++++ 5 files changed, 225 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java ---------------------------------------------------------------------- diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java index 38c9cde..380c111 100644 --- a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java @@ -19,20 +19,29 @@ package org.apache.batchee.cdi; import org.apache.batchee.cdi.component.Holder; import org.apache.batchee.cdi.component.JobScopedBean; import org.apache.batchee.cdi.component.StepScopedBean; +import org.apache.batchee.cdi.partitioned.PartitionedJobScopedReader; import org.apache.batchee.cdi.testng.CdiContainerLifecycle; import org.apache.batchee.util.Batches; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Listeners; import org.testng.annotations.Test; import javax.batch.operations.JobOperator; import javax.batch.runtime.BatchRuntime; +import javax.batch.runtime.BatchStatus; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotSame; -import static org.testng.Assert.assertTrue; +import java.util.Properties; + +import static org.testng.Assert.*; @Listeners(CdiContainerLifecycle.class) public class BatchScopesTest { + + @BeforeMethod + public void resetJobScopedBean() { + JobScopedBean.reset(); + } + @Test public void test() { final JobOperator jobOperator = BatchRuntime.getJobOperator(); @@ -47,4 +56,30 @@ public class BatchScopesTest { assertTrue(JobScopedBean.isDestroyed()); assertTrue(StepScopedBean.isDestroyed()); } + + @Test + public void testPartitionedJobScoped() throws Exception { + + JobOperator jobOperator = BatchRuntime.getJobOperator(); + + long executionId = jobOperator.start("partitioned-job-scoped", new Properties()); + + Thread.sleep(100); + + assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId()); + + PartitionedJobScopedReader.stop(); + + Thread.sleep(100); + + assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId()); + assertFalse(JobScopedBean.isDestroyed(), "JobScopedBean must not be destroyed -> partition 2 is still running :("); + + PartitionedJobScopedReader.stopPartition2(); + + assertEquals(Batches.waitFor(executionId), BatchStatus.COMPLETED); + + assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId()); + assertTrue(JobScopedBean.isDestroyed()); + } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java ---------------------------------------------------------------------- diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java index 7b66b35..902eabf 100644 --- a/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java @@ -41,4 +41,8 @@ public class JobScopedBean { void destroy() { destroyed = true; } + + public static void reset() { + destroyed = false; + } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java ---------------------------------------------------------------------- diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java new file mode 100644 index 0000000..3b9aedc --- /dev/null +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.partitioned; + +import org.apache.batchee.cdi.component.JobScopedBean; + +import javax.batch.api.BatchProperty; +import javax.batch.api.chunk.AbstractItemReader; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Named +@Dependent +public class PartitionedJobScopedReader extends AbstractItemReader { + + private static final Logger LOG = Logger.getLogger(PartitionedJobScopedReader.class.getName()); + + private static volatile boolean stop; + private static volatile boolean stopPartition2; + + private static long originalBeanId; + private static long currentBeanId; + + private boolean firstRun = true; + + @Inject + @BatchProperty + private Integer partition; + + @Inject + private JobScopedBean jobScopedBean; + + + @Override + public Object readItem() throws Exception { + + if (firstRun) { + originalBeanId = jobScopedBean.getId(); + } + + currentBeanId = jobScopedBean.getId(); + + if (partition == 2 && stopPartition2 || partition != 2 && stop) { + LOG.log(Level.INFO, "Finished partition "+ partition); + return null; + } + + Thread.sleep(10); + return "Partition " + partition + ": JobScopedBean destroyed? " + JobScopedBean.isDestroyed(); + } + + + public static void stop() { + LOG.log(Level.INFO, "Stopping all partitions except Partition 2"); + stop = true; + } + + public static void stopPartition2() { + stopPartition2 = true; + } + + public static long originalBeanId() { + return originalBeanId; + } + + public static long currentBeanId() { + return currentBeanId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java ---------------------------------------------------------------------- diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java new file mode 100644 index 0000000..48c51aa --- /dev/null +++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.batchee.cdi.partitioned; + +import org.apache.batchee.cdi.component.JobScopedBean; + +import javax.batch.api.BatchProperty; +import javax.batch.api.chunk.AbstractItemWriter; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; +import javax.inject.Named; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +@Named +@Dependent +public class PartitionedJobScopedWriter extends AbstractItemWriter { + + private static final Logger LOG = Logger.getLogger(PartitionedJobScopedWriter.class.getName()); + + @Inject + @BatchProperty + private Integer partition; + + + @Override + public void writeItems(List<Object> items) throws Exception { + LOG.log(Level.INFO, "Writing {0} items from partition {1}", new Object[]{items.size(), partition}); + LOG.log(Level.INFO, "JobScopedBean destroyed? {0}", JobScopedBean.isDestroyed()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml ---------------------------------------------------------------------- diff --git a/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml new file mode 100644 index 0000000..58f9fd5 --- /dev/null +++ b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + See the NOTICE file distributed with this work for additional information + regarding copyright ownership. Licensed under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<job id="partitioned-job-scoped-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> + + <step id="partitioned-step"> + + <chunk item-count="100"> + <reader ref="partitionedJobScopedReader"> + <properties> + <property name="partition" value="#{partitionPlan['index']}" /> + </properties> + </reader> + <writer ref="partitionedJobScopedWriter"> + <properties> + <property name="partition" value="#{partitionPlan['index']}" /> + </properties> + </writer> + </chunk> + + <partition> + + <plan partitions="2" threads="2"> + + <properties partition="0"> + <property name="index" value="1" /> + </properties> + + <properties partition="1"> + <property name="index" value="2" /> + </properties> + + </plan> + + </partition> + + </step> + +</job>
