GG-11655 - Restore service compatibility with releases before 1.5.30.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/950bad47 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/950bad47 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/950bad47 Branch: refs/heads/ignite-4242 Commit: 950bad474ef29f9b808e74034c49a69d57eb2740 Parents: 175da6b Author: dkarachentsev <[email protected]> Authored: Tue Nov 8 14:03:34 2016 +0300 Committer: dkarachentsev <[email protected]> Committed: Tue Nov 8 14:03:34 2016 +0300 ---------------------------------------------------------------------- .../internal/processors/job/GridJobWorker.java | 10 ++- .../service/GridServiceProcessor.java | 61 ++++++++++++- .../internal/util/SerializableTransient.java | 58 +++++++++++++ .../ignite/marshaller/MarshallerUtils.java | 22 +++++ .../optimized/OptimizedClassDescriptor.java | 90 +++++++++++++++++++- 5 files changed, 234 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 8169eb1..5f38b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -57,6 +57,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.MarshallerUtils; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_JOB_CANCELLED; @@ -421,7 +422,14 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { try { if (job == null) { - job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + MarshallerUtils.jobSenderVersion(taskNode.version()); + + try { + job = U.unmarshal(marsh, jobBytes, U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + finally { + MarshallerUtils.jobSenderVersion(null); + } // No need to hold reference any more. jobBytes = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 527d360..6c26363 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.processors.service; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -87,6 +90,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.internal.util.SerializableTransient; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.JobContextResource; import org.apache.ignite.resources.LoggerResource; @@ -115,6 +119,9 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ public static final IgniteProductVersion LAZY_SERVICES_CFG_SINCE = IgniteProductVersion.fromString("1.5.22"); + /** Versions that only compatible with each other, and from 1.5.33. */ + private static final Set<IgniteProductVersion> SERVICE_TOP_CALLABLE_VER1; + /** */ private final Boolean srvcCompatibilitySysProp; @@ -162,6 +169,31 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** Topology listener. */ private GridLocalEventListener topLsnr = new TopologyListener(); + static { + Set<IgniteProductVersion> versions = new TreeSet<>(new Comparator<IgniteProductVersion>() { + @Override public int compare(final IgniteProductVersion o1, final IgniteProductVersion o2) { + return o1.compareToIgnoreTimestamp(o2); + } + }); + + versions.add(IgniteProductVersion.fromString("1.5.30")); + versions.add(IgniteProductVersion.fromString("1.5.31")); + versions.add(IgniteProductVersion.fromString("1.5.32")); + versions.add(IgniteProductVersion.fromString("1.6.3")); + versions.add(IgniteProductVersion.fromString("1.6.4")); + versions.add(IgniteProductVersion.fromString("1.6.5")); + versions.add(IgniteProductVersion.fromString("1.6.6")); + versions.add(IgniteProductVersion.fromString("1.6.7")); + versions.add(IgniteProductVersion.fromString("1.6.8")); + versions.add(IgniteProductVersion.fromString("1.6.9")); + versions.add(IgniteProductVersion.fromString("1.6.10")); + versions.add(IgniteProductVersion.fromString("1.7.0")); + versions.add(IgniteProductVersion.fromString("1.7.1")); + versions.add(IgniteProductVersion.fromString("1.7.2")); + + SERVICE_TOP_CALLABLE_VER1 = Collections.unmodifiableSet(versions); + } + /** * @param ctx Kernal context. */ @@ -668,9 +700,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { ClusterNode node = cache.affinity().mapKeyToNode(name); if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) { + final ServiceTopologyCallable call = new ServiceTopologyCallable(name); + + call.serialize = SERVICE_TOP_CALLABLE_VER1.contains(node.version()); + return ctx.closure().callAsyncNoFailover( GridClosureCallMode.BROADCAST, - new ServiceTopologyCallable(name), + call, Collections.singletonList(node), false ).get(); @@ -1829,6 +1865,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ @GridInternal + @SerializableTransient(methodName = "serializableTransient") private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> { /** */ private static final long serialVersionUID = 0L; @@ -1837,10 +1874,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { private static final IgniteProductVersion SINCE_VER = IgniteProductVersion.fromString("1.5.7"); /** */ + private static final String[] SER_FIELDS = {"waitedCacheInit", "jCtx", "log"}; + + /** */ private final String svcName; /** */ - private boolean waitedCacheInit; + private transient boolean waitedCacheInit; /** */ @IgniteInstanceResource @@ -1848,11 +1888,14 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** */ @JobContextResource - private ComputeJobContext jCtx; + private transient ComputeJobContext jCtx; /** */ @LoggerResource - private IgniteLogger log; + private transient IgniteLogger log; + + /** */ + transient boolean serialize; /** * @param svcName Service name. @@ -1898,6 +1941,16 @@ public class GridServiceProcessor extends GridProcessorAdapter { return serviceTopology(cache, svcName); } + + /** + * @param self Instance of current class before serialization. + * @param ver Sender job version. + * @return List of serializable transient fields. + */ + @SuppressWarnings("unused") + private static String[] serializableTransient(ServiceTopologyCallable self, IgniteProductVersion ver) { + return (self != null && self.serialize) || (ver != null && SERVICE_TOP_CALLABLE_VER1.contains(ver)) ? SER_FIELDS : null; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java new file mode 100644 index 0000000..14a2f27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import org.apache.ignite.lang.IgniteProductVersion; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks class as it has transient fields that should be serialized. + * Annotated class must have method that returns list of transient + * fields that should be serialized. + * <p> + * Works only for jobs. For other messages node version is not available. + * </p> + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface SerializableTransient { + /** + * Name of the private static method that returns list of transient fields + * that should be serialized (String[]), and accepts itself (before serialization) + * and {@link IgniteProductVersion}, e.g. + * <pre> + * private static String[] fields(Object self, IgniteProductVersion ver){ + * return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null; + * } + * </pre> + * <p> + * On serialization version argument <tt>ver</tt> is null, on deserialization - <tt>self</tt> is null. + * </p> + * <p> + * If it returns empty array or null all transient fields will be normally + * ignored. + * </p> + * + * @return Name of the method. + */ + String methodName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java index 9668baf..ad63702 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.marshaller; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.Nullable; @@ -24,6 +25,9 @@ import org.jetbrains.annotations.Nullable; * Utility marshaller methods. */ public class MarshallerUtils { + /** Job sender node version. */ + private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>(); + /** * Set node name to marshaller context if possible. * @@ -55,4 +59,22 @@ public class MarshallerUtils { private MarshallerUtils() { // No-op. } + + /** + * Sets thread local job sender node version. + * + * @param ver Thread local job sender node version. + */ + public static void jobSenderVersion(IgniteProductVersion ver) { + JOB_SND_NODE_VER.set(ver); + } + + /** + * Returns thread local job sender node version. + * + * @return Thread local job sender node version. + */ + public static IgniteProductVersion jobSenderVersion() { + return JOB_SND_NODE_VER.get(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/950bad47/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java index 5a5b54d..160f2c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedClassDescriptor.java @@ -47,8 +47,11 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.MarshallerContext; import org.apache.ignite.marshaller.MarshallerExclusions; +import org.apache.ignite.internal.util.SerializableTransient; +import org.apache.ignite.marshaller.MarshallerUtils; import static java.lang.reflect.Modifier.isFinal; import static java.lang.reflect.Modifier.isPrivate; @@ -166,6 +169,9 @@ class OptimizedClassDescriptor { /** Proxy interfaces. */ private Class<?>[] proxyIntfs; + /** Method returns serializable transient fields. */ + private Method serTransMtd; + /** * Creates descriptor for class. * @@ -441,6 +447,27 @@ class OptimizedClassDescriptor { readObjMtds.add(mtd); + final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class); + + // Custom serialization policy for transient fields. + if (serTransAn != null) { + try { + serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class); + + int mod = serTransMtd.getModifiers(); + + if (isStatic(mod) && isPrivate(mod) + && serTransMtd.getReturnType() == String[].class) + serTransMtd.setAccessible(true); + else + // Set method back to null if it has incorrect signature. + serTransMtd = null; + } + catch (NoSuchMethodException ignored) { + serTransMtd = null; + } + } + Field[] clsFields0 = c.getDeclaredFields(); Map<String, Field> fieldNames = new HashMap<>(); @@ -797,7 +824,7 @@ class OptimizedClassDescriptor { writeTypeData(out); out.writeShort(checksum); - out.writeSerializable(obj, writeObjMtds, fields); + out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(), obj, null)); break; @@ -807,6 +834,60 @@ class OptimizedClassDescriptor { } /** + * Gets list of serializable fields. If {@link #serTransMtd} method + * returns list of transient fields, they will be added to other fields. + * Transient fields that are not included in that list will be normally + * ignored. + * + * @param cls Class. + * @param obj Object. + * @param ver Job sender version. + * @return Serializable fields. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion ver) { + if (serTransMtd == null) + return fields; + + try { + final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver); + + if (transFields == null || transFields.length == 0) + return fields; + + List<FieldInfo> clsFields = new ArrayList<>(); + + clsFields.addAll(fields.fields.get(0).fields); + + for (int i = 0; i < transFields.length; i++) { + final String fieldName = transFields[i]; + + final Field f = cls.getDeclaredField(fieldName); + + FieldInfo fieldInfo = new FieldInfo(f, f.getName(), + GridUnsafe.objectFieldOffset(f), fieldType(f.getType())); + + clsFields.add(fieldInfo); + } + + Collections.sort(clsFields, new Comparator<FieldInfo>() { + @Override public int compare(FieldInfo t1, FieldInfo t2) { + return t1.name().compareTo(t2.name()); + } + }); + + List<ClassFields> fields = new ArrayList<>(); + + fields.add(new ClassFields(clsFields)); + + return new Fields(fields); + } + catch (Exception e) { + return fields; + } + } + + /** * @param out Output stream. * @throws IOException In case of error. */ @@ -838,7 +919,12 @@ class OptimizedClassDescriptor { case SERIALIZABLE: verifyChecksum(in.readShort()); - return in.readSerializable(cls, readObjMtds, readResolveMtd, fields); + // If no serialize method, then unmarshal as usual. + if (serTransMtd != null) + return in.readSerializable(cls, readObjMtds, readResolveMtd, + serializableFields(cls, null, MarshallerUtils.jobSenderVersion())); + else + return in.readSerializable(cls, readObjMtds, readResolveMtd, fields); default: assert false : "Unexpected type: " + type;
