IGNITE-4231 - Hangs on compute result serialization error. Fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6d744db2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6d744db2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6d744db2 Branch: refs/heads/master Commit: 6d744db27111c68e13b06ec99428a4c4148d97b6 Parents: ceb60d2 Author: dkarachentsev <[email protected]> Authored: Mon Dec 12 11:44:57 2016 +0300 Committer: dkarachentsev <[email protected]> Committed: Mon Dec 12 11:44:57 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/job/GridJobWorker.java | 69 +++++++- .../closure/GridClosureSerializationTest.java | 177 +++++++++++++++++++ .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 3 files changed, 241 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 5f38b29..f5c6a27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.job; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -790,6 +791,64 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { } else { try { + byte[] resBytes = null; + byte[] exBytes = null; + byte[] attrBytes = null; + + boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs(); + + Map<Object, Object> attrs = jobCtx.getAttributes(); + + // Try serialize response, and if exception - return to client. + if (!loc) { + try { + resBytes = U.marshal(marsh, res); + } + catch (IgniteCheckedException e) { + resBytes = U.marshal(marsh, null); + + if (ex != null) + ex.addSuppressed(e); + else + ex = U.convertException(e); + + U.error(log, "Failed to serialize job response [nodeId=" + taskNode.id() + + ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + + ", resCls=" + (res == null ? null : res.getClass()) + ']', e); + } + + try { + attrBytes = U.marshal(marsh, attrs); + } + catch (IgniteCheckedException e) { + attrBytes = U.marshal(marsh, Collections.emptyMap()); + + if (ex != null) + ex.addSuppressed(e); + else + ex = U.convertException(e); + + U.error(log, "Failed to serialize job attributes [nodeId=" + taskNode.id() + + ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + + ", attrs=" + attrs + ']', e); + } + + try { + exBytes = U.marshal(marsh, ex); + } + catch (IgniteCheckedException e) { + String msg = "Failed to serialize job exception [nodeId=" + taskNode.id() + + ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + + ", msg=\"" + e.getMessage() + "\"]"; + + ex = new IgniteException(msg); + + U.error(log, msg, e); + + exBytes = U.marshal(marsh, ex); + } + } + if (ex != null) { if (isStarted) { // Job failed. @@ -804,19 +863,15 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED)) evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null); - boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs(); - - Map<Object, Object> attrs = jobCtx.getAttributes(); - GridJobExecuteResponse jobRes = new GridJobExecuteResponse( ctx.localNodeId(), ses.getId(), ses.getJobId(), - loc ? null : U.marshal(marsh, ex), + exBytes, loc ? ex : null, - loc ? null: U.marshal(marsh, res), + resBytes, loc ? res : null, - loc ? null : U.marshal(marsh, attrs), + attrBytes, loc ? attrs : null, isCancelled(), retry ? ctx.cache().context().exchange().readyAffinityVersion() : null); http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java new file mode 100644 index 0000000..2426dd7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureSerializationTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.compute.ComputeJobContext; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.JobContextResource; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests handling of job result serialization error. + */ +public class GridClosureSerializationTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(null); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + startGrid(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"}) + public void testSerializationFailure() throws Exception { + final IgniteEx ignite0 = grid(0); + final IgniteEx ignite1 = grid(1); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return new CaseClass.CaseClass2(); + } + }); + + return null; + } + }, BinaryObjectException.class, null); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"}) + public void testExceptionSerializationFailure() throws Exception { + final IgniteEx ignite0 = grid(0); + final IgniteEx ignite1 = grid(1); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + throw new BrokenException(); + } + }); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"}) + public void testAttributesSerializationFailure() throws Exception { + final IgniteEx ignite0 = grid(0); + final IgniteEx ignite1 = grid(1); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @JobContextResource + private ComputeJobContext jobCtx; + + @Override public Object call() throws Exception { + ignite1.compute(ignite1.cluster().forNode(ignite0.localNode())).call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + jobCtx.setAttribute("test-attr", new BrokenAttribute()); + + return null; + } + }); + + return null; + } + }, IgniteException.class, null); + } + + /** + * Binary marshaller will fail because subclass defines other field with different case. + */ + @SuppressWarnings("unused") + private static class CaseClass { + /** */ + private String val; + + /** + * + */ + private static class CaseClass2 extends CaseClass { + /** */ + private String vAl; + } + } + + /** + * + */ + private static class BrokenAttribute implements Externalizable { + /** {@inheritDoc} */ + @Override public void writeExternal(final ObjectOutput out) throws IOException { + throw new IOException("Test exception"); + } + + /** {@inheritDoc} */ + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + throw new IOException("Test exception"); + } + } + + /** + * + */ + private static class BrokenException extends Exception implements Externalizable { + /** {@inheritDoc} */ + @Override public void writeExternal(final ObjectOutput out) throws IOException { + throw new IOException("Test exception"); + } + + /** {@inheritDoc} */ + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + throw new IOException("Test exception"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6d744db2/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 6ab0885..1c1fcf7 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCac import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest; import org.apache.ignite.internal.processors.cache.OffHeapTieredTransactionSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; +import org.apache.ignite.internal.processors.closure.GridClosureSerializationTest; import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; import org.apache.ignite.internal.processors.continuous.GridMessageListenSelfTest; import org.apache.ignite.internal.processors.odbc.OdbcProcessorValidationSelfTest; @@ -109,6 +110,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(GridProductVersionSelfTest.class); suite.addTestSuite(GridAffinityProcessorRendezvousSelfTest.class); suite.addTestSuite(GridClosureProcessorSelfTest.class); + suite.addTestSuite(GridClosureSerializationTest.class); suite.addTestSuite(ClosureServiceClientsNodesTest.class); suite.addTestSuite(GridStartStopSelfTest.class); suite.addTestSuite(GridProjectionForCachesSelfTest.class);
