Repository: ignite Updated Branches: refs/heads/master 6a1aa1f06 -> 947962f78
Fixed a problem with optimized marshaller. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/947962f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/947962f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/947962f7 Branch: refs/heads/master Commit: 947962f785698f4c6bb749f4acce4a96651659fd Parents: 6a1aa1f Author: devozerov <[email protected]> Authored: Mon Sep 10 22:55:55 2018 +0300 Committer: devozerov <[email protected]> Committed: Mon Sep 10 22:55:55 2018 +0300 ---------------------------------------------------------------------- .../configuration/TransactionConfiguration.java | 20 +++++ .../optimized/OptimizedClassDescriptor.java | 87 +++++++++++++------- .../service/GridServiceProcessor.java | 3 - .../processors/task/GridTaskWorker.java | 72 +++++++++------- .../internal/util/SerializableTransient.java | 10 +-- .../internal/util/TransientSerializable.java | 55 +++++++++++++ .../ignite/marshaller/MarshallerUtils.java | 22 ++++- 7 files changed, 197 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java index 0ac215f..e669bcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/TransactionConfiguration.java @@ -19,7 +19,9 @@ package org.apache.ignite.configuration; import java.io.Serializable; import javax.cache.configuration.Factory; +import org.apache.ignite.internal.util.TransientSerializable; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -27,8 +29,12 @@ import org.apache.ignite.transactions.TransactionIsolation; /** * Transactions configuration. */ +@TransientSerializable(methodName = "transientSerializableFields") public class TransactionConfiguration implements Serializable { /** */ + private static final IgniteProductVersion TX_PME_TIMEOUT_SINCE = IgniteProductVersion.fromString("2.5.1"); + + /** */ private static final long serialVersionUID = 0L; /** Default value for 'txSerializableEnabled' flag. */ @@ -387,4 +393,18 @@ public class TransactionConfiguration implements Serializable { @Override public String toString() { return S.toString(TransactionConfiguration.class, this); } + + /** + * Excludes incompatible fields from serialization/deserialization process. + * + * @param ver Sender/Receiver node version. + * @return Array of excluded from serialization/deserialization fields. + */ + @SuppressWarnings("unused") + private static String[] transientSerializableFields(IgniteProductVersion ver) { + if (TX_PME_TIMEOUT_SINCE.compareToIgnoreTimestamp(ver) >= 0) + return new String[] { "txTimeoutOnPartitionMapExchange" }; + + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java index ccd9946..0369b66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedClassDescriptor.java @@ -43,16 +43,18 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.SerializableTransient; +import org.apache.ignite.internal.util.TransientSerializable; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.marshaller.MarshallerContext; import org.apache.ignite.marshaller.MarshallerExclusions; -import org.apache.ignite.marshaller.MarshallerUtils; import static java.lang.reflect.Modifier.isFinal; import static java.lang.reflect.Modifier.isPrivate; @@ -92,6 +94,8 @@ import static org.apache.ignite.internal.marshaller.optimized.OptimizedMarshalle import static org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.STR; import static org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.UUID; import static org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerUtils.computeSerialVersionUid; +import static org.apache.ignite.marshaller.MarshallerUtils.jobReceiverVersion; +import static org.apache.ignite.marshaller.MarshallerUtils.jobSenderVersion; /** * Class descriptor. @@ -172,6 +176,9 @@ class OptimizedClassDescriptor { /** Method returns serializable transient fields. */ private Method serTransMtd; + /** Method returns transient serializable fields. */ + private Method transSerMtd; + /** * Creates descriptor for class. * @@ -448,16 +455,16 @@ class OptimizedClassDescriptor { readObjMtds.add(mtd); final SerializableTransient serTransAn = c.getAnnotation(SerializableTransient.class); + final TransientSerializable transSerAn = c.getAnnotation(TransientSerializable.class); // Custom serialization policy for transient fields. if (serTransAn != null) { try { - serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), cls, IgniteProductVersion.class); + serTransMtd = c.getDeclaredMethod(serTransAn.methodName(), IgniteProductVersion.class); int mod = serTransMtd.getModifiers(); - if (isStatic(mod) && isPrivate(mod) - && serTransMtd.getReturnType() == String[].class) + if (isStatic(mod) && isPrivate(mod) && serTransMtd.getReturnType() == String[].class) serTransMtd.setAccessible(true); else // Set method back to null if it has incorrect signature. @@ -468,6 +475,24 @@ class OptimizedClassDescriptor { } } + // Custom serialization policy for non-transient fields. + if (transSerAn != null) { + try { + transSerMtd = c.getDeclaredMethod(transSerAn.methodName(), IgniteProductVersion.class); + + int mod = transSerMtd.getModifiers(); + + if (isStatic(mod) && isPrivate(mod) && transSerMtd.getReturnType() == String[].class) + transSerMtd.setAccessible(true); + else + // Set method back to null if it has incorrect signature. + transSerMtd = null; + } + catch (NoSuchMethodException ignored) { + transSerMtd = null; + } + } + Field[] clsFields0 = c.getDeclaredFields(); Map<String, Field> fieldNames = new HashMap<>(); @@ -824,7 +849,7 @@ class OptimizedClassDescriptor { writeTypeData(out); out.writeShort(checksum); - out.writeSerializable(obj, writeObjMtds, serializableFields(obj.getClass(), obj, null)); + out.writeSerializable(obj, writeObjMtds, fields(obj.getClass(), jobReceiverVersion())); break; @@ -840,45 +865,52 @@ class OptimizedClassDescriptor { * ignored. * * @param cls Class. - * @param obj Object. * @param ver Job sender version. * @return Serializable fields. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private Fields serializableFields(Class<?> cls, Object obj, IgniteProductVersion ver) { - if (serTransMtd == null) + private Fields fields(Class<?> cls, IgniteProductVersion ver) { + if (ver == null // No context available. + || serTransMtd == null && transSerMtd == null) return fields; try { - final String[] transFields = (String[])serTransMtd.invoke(cls, obj, ver); + final String[] transFields = serTransMtd == null ? null : (String[])serTransMtd.invoke(null, ver); + final String[] serFields = transSerMtd == null ? null : (String[])transSerMtd.invoke(null, ver); - if (transFields == null || transFields.length == 0) + if (F.isEmpty(transFields) && F.isEmpty(serFields)) return fields; - List<FieldInfo> clsFields = new ArrayList<>(); + Map<String, FieldInfo> clsFields = new TreeMap<>(); - clsFields.addAll(fields.fields.get(0).fields); + for (FieldInfo field : fields.fields.get(0).fields) { + clsFields.put(field.fieldName, field); + } - for (int i = 0; i < transFields.length; i++) { - final String fieldName = transFields[i]; + // Add serializable transient fields + if (!F.isEmpty(transFields)) { + for (int i = 0; i < transFields.length; i++) { + final String fieldName = transFields[i]; - final Field f = cls.getDeclaredField(fieldName); + final Field f = cls.getDeclaredField(fieldName); - FieldInfo fieldInfo = new FieldInfo(f, f.getName(), - GridUnsafe.objectFieldOffset(f), fieldType(f.getType())); + FieldInfo fieldInfo = new FieldInfo(f, f.getName(), + GridUnsafe.objectFieldOffset(f), fieldType(f.getType())); - clsFields.add(fieldInfo); + clsFields.put(fieldName, fieldInfo); + } } - Collections.sort(clsFields, new Comparator<FieldInfo>() { - @Override public int compare(FieldInfo t1, FieldInfo t2) { - return t1.name().compareTo(t2.name()); + // Exclude non-transient fields which shouldn't be serialized. + if (!F.isEmpty(serFields)) { + for (int i = 0; i < serFields.length; i++) { + clsFields.remove(serFields[i]); } - }); + } - List<ClassFields> fields = new ArrayList<>(); + List<ClassFields> fields = new ArrayList<>(1); - fields.add(new ClassFields(clsFields)); + fields.add(new ClassFields(new ArrayList<>(clsFields.values()))); return new Fields(fields); } @@ -919,12 +951,7 @@ class OptimizedClassDescriptor { case SERIALIZABLE: verifyChecksum(in.readShort()); - // If no serialize method, then unmarshal as usual. - if (serTransMtd != null) - return in.readSerializable(cls, readObjMtds, readResolveMtd, - serializableFields(cls, null, MarshallerUtils.jobSenderVersion())); - else - return in.readSerializable(cls, readObjMtds, readResolveMtd, fields); + return in.readSerializable(cls, readObjMtds, readResolveMtd, fields(cls, jobSenderVersion())); default: assert false : "Unexpected type: " + type; http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 9e8c7fa..fce7053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -69,7 +69,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; @@ -79,7 +78,6 @@ import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.SerializableTransient; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -2127,7 +2125,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite /** */ @GridInternal - @SerializableTransient(methodName = "serializableTransient") private static class ServiceTopologyCallable implements IgniteCallable<Map<UUID, Integer>> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 82ad5d5..b73737c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -84,6 +84,7 @@ import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.MarshallerUtils; import org.apache.ignite.resources.TaskContinuousMapperResource; import org.jetbrains.annotations.Nullable; @@ -1375,38 +1376,45 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { boolean forceLocDep = internal || !ctx.deploy().enabled(); - req = new GridJobExecuteRequest( - ses.getId(), - res.getJobContext().getJobId(), - ses.getTaskName(), - ses.getUserVersion(), - ses.getTaskClassName(), - loc ? null : U.marshal(marsh, res.getJob()), - loc ? res.getJob() : null, - ses.getStartTime(), - timeout, - ses.getTopology(), - loc ? ses.getTopologyPredicate() : null, - loc ? null : U.marshal(marsh, ses.getTopologyPredicate()), - loc ? null : U.marshal(marsh, ses.getJobSiblings()), - loc ? ses.getJobSiblings() : null, - loc ? null : U.marshal(marsh, sesAttrs), - loc ? sesAttrs : null, - loc ? null : U.marshal(marsh, jobAttrs), - loc ? jobAttrs : null, - ses.getCheckpointSpi(), - dep.classLoaderId(), - dep.deployMode(), - continuous, - dep.participants(), - forceLocDep, - ses.isFullSupport(), - internal, - subjId, - affCacheIds, - affPartId, - mapTopVer, - ses.executorName()); + try { + MarshallerUtils.jobReceiverVersion(node.version()); + + req = new GridJobExecuteRequest( + ses.getId(), + res.getJobContext().getJobId(), + ses.getTaskName(), + ses.getUserVersion(), + ses.getTaskClassName(), + loc ? null : U.marshal(marsh, res.getJob()), + loc ? res.getJob() : null, + ses.getStartTime(), + timeout, + ses.getTopology(), + loc ? ses.getTopologyPredicate() : null, + loc ? null : U.marshal(marsh, ses.getTopologyPredicate()), + loc ? null : U.marshal(marsh, ses.getJobSiblings()), + loc ? ses.getJobSiblings() : null, + loc ? null : U.marshal(marsh, sesAttrs), + loc ? sesAttrs : null, + loc ? null : U.marshal(marsh, jobAttrs), + loc ? jobAttrs : null, + ses.getCheckpointSpi(), + dep.classLoaderId(), + dep.deployMode(), + continuous, + dep.participants(), + forceLocDep, + ses.isFullSupport(), + internal, + subjId, + affCacheIds, + affPartId, + mapTopVer, + ses.executorName()); + } + finally { + MarshallerUtils.jobReceiverVersion(null); + } if (loc) ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java index 14a2f27..e016009 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SerializableTransient.java @@ -37,19 +37,17 @@ import java.lang.annotation.Target; public @interface SerializableTransient { /** * Name of the private static method that returns list of transient fields - * that should be serialized (String[]), and accepts itself (before serialization) - * and {@link IgniteProductVersion}, e.g. + * that should be serialized (String[]), and accepts {@link IgniteProductVersion}, e.g. * <pre> - * private static String[] fields(Object self, IgniteProductVersion ver){ + * private static String[] fields(IgniteProductVersion ver){ * return ver.compareTo("1.5.30") > 0 ? SERIALIZABLE_FIELDS : null; * } * </pre> * <p> - * On serialization version argument <tt>ver</tt> is null, on deserialization - <tt>self</tt> is null. + * On serialization version argument <tt>ver</tt> is receiver version and sender version on deserialization. * </p> * <p> - * If it returns empty array or null all transient fields will be normally - * ignored. + * If it returns empty array or null all transient fields will be normally ignored. * </p> * * @return Name of the method. http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java b/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java new file mode 100644 index 0000000..b583c1b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/TransientSerializable.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import org.apache.ignite.lang.IgniteProductVersion; + +/** + * Marks class as it has non-transient fields that should not be serialized. + * Annotated class must have method that returns list of non-transient + * fields that should not be serialized. + * <p> + * Works only for jobs. For other messages node version is not available. + * </p> + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface TransientSerializable { + /** + * Name of the private static method that returns list of non-transient fields + * that should not be serialized (String[]), and accepts {@link IgniteProductVersion}, e.g. + * <pre> + * private static String[] fields(IgniteProductVersion ver){ + * return ver.compareTo("1.5.30") < 0 ? EXCLUDED_FIELDS : null; + * } + * </pre> + * <p> + * On serialization version argument <tt>ver</tt> is receiver version and sender version on deserialization. + * </p> + * <p> + * If it returns empty array or null all non-transient fields will be normally serialized. + * </p> + * + * @return Name of the method. + */ + String methodName(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/947962f7/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java index f7fef52..d5cf386 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerUtils.java @@ -45,6 +45,9 @@ public class MarshallerUtils { /** Job sender node version. */ private static final ThreadLocal<IgniteProductVersion> JOB_SND_NODE_VER = new ThreadLocal<>(); + /** Job sender node version. */ + private static final ThreadLocal<IgniteProductVersion> JOB_RCV_NODE_VER = new ThreadLocal<>(); + /** * Set node name to marshaller context if possible. * @@ -96,6 +99,24 @@ public class MarshallerUtils { } /** + * Sets thread local job receiver node version. + * + * @param ver Thread local job receiver node version. + */ + public static void jobReceiverVersion(IgniteProductVersion ver) { + JOB_RCV_NODE_VER.set(ver); + } + + /** + * Returns thread local job receiver node version. + * + * @return Thread local job receiver node version. + */ + public static IgniteProductVersion jobReceiverVersion() { + return JOB_RCV_NODE_VER.get(); + } + + /** * Returns class name filter for marshaller. * * @return Class name filter for marshaller. @@ -199,5 +220,4 @@ public class MarshallerUtils { "[path=" + fileName + ']', e); } } - }
