Repository: ignite
Updated Branches:
  refs/heads/master 6a1aa1f06 -> 947962f78


Fixed a problem with optimized marshaller.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/947962f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/947962f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/947962f7

Branch: refs/heads/master
Commit: 947962f785698f4c6bb749f4acce4a96651659fd
Parents: 6a1aa1f
Author: devozerov <[email protected]>
Authored: Mon Sep 10 22:55:55 2018 +0300
Committer: devozerov <[email protected]>
Committed: Mon Sep 10 22:55:55 2018 +0300

----------------------------------------------------------------------
 .../configuration/TransactionConfiguration.java | 20 +++++
 .../optimized/OptimizedClassDescriptor.java     | 87 +++++++++++++-------
 .../service/GridServiceProcessor.java           |  3 -
 .../processors/task/GridTaskWorker.java         | 72 +++++++++-------
 .../internal/util/SerializableTransient.java    | 10 +--
 .../internal/util/TransientSerializable.java    | 55 +++++++++++++
 .../ignite/marshaller/MarshallerUtils.java      | 22 ++++-
 7 files changed, 197 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
index 0ac215f..e669bcf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java
@@ -19,7 +19,9 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import javax.cache.configuration.Factory;
+import org.apache.ignite.internal.util.TransientSerializable;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -27,8 +29,12 @@ import org.apache.ignite.transactions.TransactionIsolation;
 /**
  * Transactions configuration.
  */
+@TransientSerializable(methodName = "transientSerializableFields")
 public class TransactionConfiguration implements Serializable {
     /** */
+    private static final IgniteProductVersion TX_PME_TIMEOUT_SINCE = 
IgniteProductVersion.fromString("2.5.1");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Default value for 'txSerializableEnabled' flag. */
@@ -387,4 +393,18 @@ public class TransactionConfiguration implements 
Serializable {
     @Override public String toString() {
         return S.toString(TransactionConfiguration.class, this);
     }
+
+    /**
+     * Excludes incompatible fields from serialization/deserialization process.
+     *
+     * @param ver Sender/Receiver node version.
+     * @return Array of excluded from serialization/deserialization fields.
+     */
+    @SuppressWarnings("unused")
+    private static String[] transientSerializableFields(IgniteProductVersion 
ver) {
+        if (TX_PME_TIMEOUT_SINCE.compareToIgnoreTimestamp(ver) >= 0)
+            return new String[] { "txTimeoutOnPartitionMapExchange" };
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java
index ccd9946..0369b66 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java
@@ -43,16 +43,18 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.SerializableTransient;
+import org.apache.ignite.internal.util.TransientSerializable;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.marshaller.MarshallerContext;
 import org.apache.ignite.marshaller.MarshallerExclusions;
-import org.apache.ignite.marshaller.MarshallerUtils;
 
 import static java.lang.reflect.Modifier.isFinal;
 import static java.lang.reflect.Modifier.isPrivate;
@@ -92,6 +94,8 @@ import static 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshalle
 import static 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.STR;
 import static 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.UUID;
 import static 
org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.computeSerialVersionUid;
+import static org.apache.ignite.marshaller.MarshallerUtils.jobReceiverVersion;
+import static org.apache.ignite.marshaller.MarshallerUtils.jobSenderVersion;
 
 /**
  * Class descriptor.
@@ -172,6 +176,9 @@ class OptimizedClassDescriptor {
     /** Method returns serializable transient fields. */
     private Method serTransMtd;
 
+    /** Method returns transient serializable fields. */
+    private Method transSerMtd;
+
     /**
      * Creates descriptor for class.
      *
@@ -448,16 +455,16 @@ class OptimizedClassDescriptor {
                         readObjMtds.add(mtd);
 
                         final SerializableTransient serTransAn = 
c.getAnnotation(SerializableTransient.class);
+                        final TransientSerializable transSerAn = 
c.getAnnotation(TransientSerializable.class);
 
                         // Custom serialization policy for transient fields.
                         if (serTransAn != null) {
                             try {
-                                serTransMtd = 
c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class);
+                                serTransMtd = 
c.getDeclaredMethod(serTransAn.methodName(), IgniteProductVersion.class);
 
                                 int mod = serTransMtd.getModifiers();
 
-                                if (isStatic(mod) && isPrivate(mod)
-                                    && serTransMtd.getReturnType() == 
String[].class)
+                                if (isStatic(mod) && isPrivate(mod) && 
serTransMtd.getReturnType() == String[].class)
                                     serTransMtd.setAccessible(true);
                                 else
                                     // Set method back to null if it has 
incorrect signature.
@@ -468,6 +475,24 @@ class OptimizedClassDescriptor {
                             }
                         }
 
+                        // Custom serialization policy for non-transient 
fields.
+                        if (transSerAn != null) {
+                            try {
+                                transSerMtd = 
c.getDeclaredMethod(transSerAn.methodName(), IgniteProductVersion.class);
+
+                                int mod = transSerMtd.getModifiers();
+
+                                if (isStatic(mod) && isPrivate(mod) && 
transSerMtd.getReturnType() == String[].class)
+                                    transSerMtd.setAccessible(true);
+                                else
+                                    // Set method back to null if it has 
incorrect signature.
+                                    transSerMtd = null;
+                            }
+                            catch (NoSuchMethodException ignored) {
+                                transSerMtd = null;
+                            }
+                        }
+
                         Field[] clsFields0 = c.getDeclaredFields();
 
                         Map<String, Field> fieldNames = new HashMap<>();
@@ -824,7 +849,7 @@ class OptimizedClassDescriptor {
                 writeTypeData(out);
 
                 out.writeShort(checksum);
-                out.writeSerializable(obj, writeObjMtds, 
serializableFields(obj.getClass(), obj, null));
+                out.writeSerializable(obj, writeObjMtds, 
fields(obj.getClass(), jobReceiverVersion()));
 
                 break;
 
@@ -840,45 +865,52 @@ class OptimizedClassDescriptor {
      * ignored.
      *
      * @param cls Class.
-     * @param obj Object.
      * @param ver Job sender version.
      * @return Serializable fields.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private Fields serializableFields(Class<?> cls, Object obj, 
IgniteProductVersion ver) {
-        if (serTransMtd == null)
+    private Fields fields(Class<?> cls, IgniteProductVersion ver) {
+        if (ver == null // No context available.
+            || serTransMtd == null && transSerMtd == null)
             return fields;
 
         try {
-            final String[] transFields = (String[])serTransMtd.invoke(cls, 
obj, ver);
+            final String[] transFields = serTransMtd == null ? null : 
(String[])serTransMtd.invoke(null, ver);
+            final String[] serFields = transSerMtd == null ? null : 
(String[])transSerMtd.invoke(null, ver);
 
-            if (transFields == null || transFields.length == 0)
+            if (F.isEmpty(transFields) && F.isEmpty(serFields))
                 return fields;
 
-            List<FieldInfo> clsFields = new ArrayList<>();
+            Map<String, FieldInfo> clsFields = new TreeMap<>();
 
-            clsFields.addAll(fields.fields.get(0).fields);
+            for (FieldInfo field : fields.fields.get(0).fields) {
+                clsFields.put(field.fieldName, field);
+            }
 
-            for (int i = 0; i < transFields.length; i++) {
-                final String fieldName = transFields[i];
+            // Add serializable transient fields
+            if (!F.isEmpty(transFields)) {
+                for (int i = 0; i < transFields.length; i++) {
+                    final String fieldName = transFields[i];
 
-                final Field f = cls.getDeclaredField(fieldName);
+                    final Field f = cls.getDeclaredField(fieldName);
 
-                FieldInfo fieldInfo = new FieldInfo(f, f.getName(),
-                    GridUnsafe.objectFieldOffset(f), fieldType(f.getType()));
+                    FieldInfo fieldInfo = new FieldInfo(f, f.getName(),
+                        GridUnsafe.objectFieldOffset(f), 
fieldType(f.getType()));
 
-                clsFields.add(fieldInfo);
+                    clsFields.put(fieldName, fieldInfo);
+                }
             }
 
-            Collections.sort(clsFields, new Comparator<FieldInfo>() {
-                @Override public int compare(FieldInfo t1, FieldInfo t2) {
-                    return t1.name().compareTo(t2.name());
+            // Exclude non-transient fields which shouldn't be serialized.
+            if (!F.isEmpty(serFields)) {
+                for (int i = 0; i < serFields.length; i++) {
+                    clsFields.remove(serFields[i]);
                 }
-            });
+            }
 
-            List<ClassFields> fields = new ArrayList<>();
+            List<ClassFields> fields = new ArrayList<>(1);
 
-            fields.add(new ClassFields(clsFields));
+            fields.add(new ClassFields(new ArrayList<>(clsFields.values())));
 
             return new Fields(fields);
         }
@@ -919,12 +951,7 @@ class OptimizedClassDescriptor {
             case SERIALIZABLE:
                 verifyChecksum(in.readShort());
 
-                // If no serialize method, then unmarshal as usual.
-                if (serTransMtd != null)
-                    return in.readSerializable(cls, readObjMtds, 
readResolveMtd,
-                        serializableFields(cls, null, 
MarshallerUtils.jobSenderVersion()));
-                else
-                    return in.readSerializable(cls, readObjMtds, 
readResolveMtd, fields);
+                return in.readSerializable(cls, readObjMtds, readResolveMtd, 
fields(cls, jobSenderVersion()));
 
             default:
                 assert false : "Unexpected type: " + type;

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 9e8c7fa..fce7053 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -69,7 +69,6 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -79,7 +78,6 @@ import 
org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.SerializableTransient;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -2127,7 +2125,6 @@ public class GridServiceProcessor extends 
GridProcessorAdapter implements Ignite
     /**
      */
     @GridInternal
-    @SerializableTransient(methodName = "serializableTransient")
     private static class ServiceTopologyCallable implements 
IgniteCallable<Map<UUID, Integer>> {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 82ad5d5..b73737c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -84,6 +84,7 @@ import 
org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
 import org.jetbrains.annotations.Nullable;
 
@@ -1375,38 +1376,45 @@ class GridTaskWorker<T, R> extends GridWorker 
implements GridTimeoutObject {
 
                     boolean forceLocDep = internal || !ctx.deploy().enabled();
 
-                    req = new GridJobExecuteRequest(
-                        ses.getId(),
-                        res.getJobContext().getJobId(),
-                        ses.getTaskName(),
-                        ses.getUserVersion(),
-                        ses.getTaskClassName(),
-                        loc ? null : U.marshal(marsh, res.getJob()),
-                        loc ? res.getJob() : null,
-                        ses.getStartTime(),
-                        timeout,
-                        ses.getTopology(),
-                        loc ? ses.getTopologyPredicate() : null,
-                        loc ? null : U.marshal(marsh, 
ses.getTopologyPredicate()),
-                        loc ? null : U.marshal(marsh, ses.getJobSiblings()),
-                        loc ? ses.getJobSiblings() : null,
-                        loc ? null : U.marshal(marsh, sesAttrs),
-                        loc ? sesAttrs : null,
-                        loc ? null : U.marshal(marsh, jobAttrs),
-                        loc ? jobAttrs : null,
-                        ses.getCheckpointSpi(),
-                        dep.classLoaderId(),
-                        dep.deployMode(),
-                        continuous,
-                        dep.participants(),
-                        forceLocDep,
-                        ses.isFullSupport(),
-                        internal,
-                        subjId,
-                        affCacheIds,
-                        affPartId,
-                        mapTopVer,
-                        ses.executorName());
+                    try {
+                        MarshallerUtils.jobReceiverVersion(node.version());
+
+                        req = new GridJobExecuteRequest(
+                            ses.getId(),
+                            res.getJobContext().getJobId(),
+                            ses.getTaskName(),
+                            ses.getUserVersion(),
+                            ses.getTaskClassName(),
+                            loc ? null : U.marshal(marsh, res.getJob()),
+                            loc ? res.getJob() : null,
+                            ses.getStartTime(),
+                            timeout,
+                            ses.getTopology(),
+                            loc ? ses.getTopologyPredicate() : null,
+                            loc ? null : U.marshal(marsh, 
ses.getTopologyPredicate()),
+                            loc ? null : U.marshal(marsh, 
ses.getJobSiblings()),
+                            loc ? ses.getJobSiblings() : null,
+                            loc ? null : U.marshal(marsh, sesAttrs),
+                            loc ? sesAttrs : null,
+                            loc ? null : U.marshal(marsh, jobAttrs),
+                            loc ? jobAttrs : null,
+                            ses.getCheckpointSpi(),
+                            dep.classLoaderId(),
+                            dep.deployMode(),
+                            continuous,
+                            dep.participants(),
+                            forceLocDep,
+                            ses.isFullSupport(),
+                            internal,
+                            subjId,
+                            affCacheIds,
+                            affPartId,
+                            mapTopVer,
+                            ses.executorName());
+                    }
+                    finally {
+                        MarshallerUtils.jobReceiverVersion(null);
+                    }
 
                     if (loc)
                         
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
index 14a2f27..e016009 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java
@@ -37,19 +37,17 @@ import java.lang.annotation.Target;
 public @interface SerializableTransient {
     /**
      * Name of the private static method that returns list of transient fields
-     * that should be serialized (String[]), and accepts itself (before 
serialization)
-     * and {@link IgniteProductVersion}, e.g.
+     * that should be serialized (String[]), and accepts {@link 
IgniteProductVersion}, e.g.
      * <pre>
-     *     private static String[] fields(Object self, IgniteProductVersion 
ver){
+     *     private static String[] fields(IgniteProductVersion ver){
      *         return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null;
      *     }
      * </pre>
      * <p>
-     *     On serialization version argument <tt>ver</tt> is null, on 
deserialization - <tt>self</tt> is null.
+     *     On serialization version argument <tt>ver</tt> is receiver version 
and sender version on deserialization.
      * </p>
      * <p>
-     *     If it returns empty array or null all transient fields will be 
normally
-     *     ignored.
+     *     If it returns empty array or null all transient fields will be 
normally ignored.
      * </p>
      *
      * @return Name of the method.

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java
new file mode 100644
index 0000000..b583c1b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import org.apache.ignite.lang.IgniteProductVersion;
+
+/**
+ * Marks class as it has non-transient fields that should not be serialized.
+ * Annotated class must have method that returns list of non-transient
+ * fields that should not be serialized.
+ * <p>
+ *     Works only for jobs. For other messages node version is not available.
+ * </p>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface TransientSerializable {
+    /**
+     * Name of the private static method that returns list of non-transient 
fields
+     * that should not be serialized (String[]), and accepts {@link 
IgniteProductVersion}, e.g.
+     * <pre>
+     *     private static String[] fields(IgniteProductVersion ver){
+     *         return ver.compareTo("1.5.30") < 0 ? EXCLUDED_FIELDS : null;
+     *     }
+     * </pre>
+     * <p>
+     *     On serialization version argument <tt>ver</tt> is receiver version 
and sender version on deserialization.
+     * </p>
+     * <p>
+     *     If it returns empty array or null all non-transient fields will be 
normally serialized.
+     * </p>
+     *
+     * @return Name of the method.
+     */
+    String methodName();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java 
b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
index f7fef52..d5cf386 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java
@@ -45,6 +45,9 @@ public class MarshallerUtils {
     /** Job sender node version. */
     private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = 
new ThreadLocal<>();
 
+    /** Job sender node version. */
+    private static final ThreadLocal<IgniteProductVersion> JOB_RCV_NODE_VER = 
new ThreadLocal<>();
+
     /**
      * Set node name to marshaller context if possible.
      *
@@ -96,6 +99,24 @@ public class MarshallerUtils {
     }
 
     /**
+     * Sets thread local job receiver node version.
+     *
+     * @param ver Thread local job receiver node version.
+     */
+    public static void jobReceiverVersion(IgniteProductVersion ver) {
+        JOB_RCV_NODE_VER.set(ver);
+    }
+
+    /**
+     * Returns thread local job receiver node version.
+     *
+     * @return Thread local job receiver node version.
+     */
+    public static IgniteProductVersion jobReceiverVersion() {
+        return JOB_RCV_NODE_VER.get();
+    }
+
+    /**
      * Returns class name filter for marshaller.
      *
      * @return Class name filter for marshaller.
@@ -199,5 +220,4 @@ public class MarshallerUtils {
                 "[path=" + fileName + ']', e);
         }
     }
-
 }

Reply via email to