http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java index f7e5618,0000000..b6b3774 mode 100644,000000..100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectInputStream.java @@@ -1,1016 -1,0 +1,1018 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.marshaller.optimized; + +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import sun.misc.*; + +import java.io.*; +import java.lang.reflect.*; +import java.util.*; + +import static org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.*; + +/** + * Optimized object input stream. + */ +class OptimizedObjectInputStream extends ObjectInputStream { + /** Unsafe. */ + private static final Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** Dummy object for HashSet. */ + private static final Object DUMMY = new Object(); + + /** */ + private final HandleTable handles = new HandleTable(10); + + /** */ + private ClassLoader clsLdr; + + /** */ + private GridDataInput in; + + /** */ + private Object curObj; + + /** */ + private List<T2<OptimizedFieldType, Long>> curFields; + + /** */ + private List<IgniteBiTuple<Integer, OptimizedFieldType>> curFieldInfoList; + + /** */ + private Map<String, IgniteBiTuple<Integer, OptimizedFieldType>> curFieldInfoMap; + + /** */ + private Class<?> curCls; + + /** + * @param in Input. + * @throws IOException In case of error. + */ + OptimizedObjectInputStream(GridDataInput in) throws IOException { + this.in = in; + } + + /** + * @throws IOException In case of error. + */ + OptimizedObjectInputStream() throws IOException { + // No-op. + } + + /** + * @param clsLdr Class loader. + */ + void classLoader(ClassLoader clsLdr) { + this.clsLdr = clsLdr; + } + + /** + * @return Class loader. + */ + ClassLoader classLoader() { + return clsLdr; + } + + /** + * @return Input. + */ + public GridDataInput in() { + return in; + } + + /** + * @param in Input. + */ + public void in(GridDataInput in) { + this.in = in; + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + reset(); ++ ++ clsLdr = null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod") + @Override public void reset() throws IOException { + in.reset(); + handles.clear(); + + curObj = null; + curFields = null; + curFieldInfoList = null; + curFieldInfoMap = null; + } + + /** {@inheritDoc} */ + @Override public Object readObjectOverride() throws ClassNotFoundException, IOException { + curObj = null; + curFields = null; + curFieldInfoList = null; + curFieldInfoMap = null; + + byte ref = in.readByte(); + + switch (ref) { + case NULL: + return null; + + case HANDLE: + return handles.lookup(readInt()); + + case OBJECT: + OptimizedClassDescriptor desc = OptimizedClassResolver.readClass(this, clsLdr); + + curCls = desc.describedClass(); + + return desc.read(this); + + default: + SB msg = new SB("Unexpected error occurred during unmarshalling"); + + if (curCls != null) + msg.a(" of an instance of the class: ").a(curCls.getName()); + + msg.a(". Check that all nodes are running the same version of GridGain and that all nodes have " + + "GridOptimizedMarshaller configured with identical optimized classes lists, if any " + + "(see setClassNames and setClassNamesPath methods). If your serialized classes implement " + + "java.io.Externalizable interface, verify that serialization logic is correct."); + + throw new IOException(msg.toString()); + } + } + + /** + * Reads array from this stream. + * + * @param compType Array component type. + * @return Array. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + <T> T[] readArray(Class<T> compType) throws ClassNotFoundException, IOException { + int len = in.readInt(); + + T[] arr = (T[])Array.newInstance(compType, len); + + handles.assign(arr); + + for (int i = 0; i < len; i++) + arr[i] = (T)readObject(); + + return arr; + } + + /** + * Reads {@link UUID} from this stream. + * + * @return UUID. + * @throws IOException In case of error. + */ + UUID readUuid() throws IOException { + UUID uuid = new UUID(readLong(), readLong()); + + handles.assign(uuid); + + return uuid; + } + + /** + * Reads {@link Properties} from this stream. + * + * @return Properties. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + Properties readProperties() throws ClassNotFoundException, IOException { + Properties dflts = readBoolean() ? null : (Properties)readObject(); + + Properties props = new Properties(dflts); + + int size = in.readInt(); + + for (int i = 0; i < size; i++) + props.setProperty(readUTF(), readUTF()); + + handles.assign(props); + + return props; + } + + /** + * Reads and sets all non-static and non-transient field values from this stream. + * + * @param obj Object. + * @param fieldOffs Field offsets. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + void readFields(Object obj, List<T2<OptimizedFieldType, Long>> fieldOffs) throws ClassNotFoundException, + IOException { + for (int i = 0; i < fieldOffs.size(); i++) { + T2<OptimizedFieldType, Long> t = fieldOffs.get(i); + + switch ((t.get1())) { + case BYTE: + setByte(obj, t.get2(), readByte()); + + break; + + case SHORT: + setShort(obj, t.get2(), readShort()); + + break; + + case INT: + setInt(obj, t.get2(), readInt()); + + break; + + case LONG: + setLong(obj, t.get2(), readLong()); + + break; + + case FLOAT: + setFloat(obj, t.get2(), readFloat()); + + break; + + case DOUBLE: + setDouble(obj, t.get2(), readDouble()); + + break; + + case CHAR: + setChar(obj, t.get2(), readChar()); + + break; + + case BOOLEAN: + setBoolean(obj, t.get2(), readBoolean()); + + break; + + case OTHER: + setObject(obj, t.get2(), readObject()); + } + } + } + + /** + * Reads {@link Externalizable} object. + * + * @param constructor Constructor. + * @param readResolveMtd {@code readResolve} method. + * @return Object. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + Object readExternalizable(Constructor<?> constructor, Method readResolveMtd) + throws ClassNotFoundException, IOException { + Object obj; + + try { + obj = constructor.newInstance(); + } + catch (InstantiationException | InvocationTargetException | IllegalAccessException e) { + throw new IOException(e); + } + + int handle = handles.assign(obj); + + Externalizable extObj = ((Externalizable)obj); + + extObj.readExternal(this); + + if (readResolveMtd != null) { + try { + obj = readResolveMtd.invoke(obj); + + handles.set(handle, obj); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IOException(e); + } + } + + return obj; + } + + /** + * Reads serializable object. + * + * @param cls Class. + * @param mtds {@code readObject} methods. + * @param readResolveMtd {@code readResolve} method. + * @param fields class fields details. + * @return Object. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + Object readSerializable(Class<?> cls, List<Method> mtds, Method readResolveMtd, + OptimizedClassDescriptor.Fields fields) throws ClassNotFoundException, IOException { + Object obj; + + try { + obj = UNSAFE.allocateInstance(cls); + } + catch (InstantiationException e) { + throw new IOException(e); + } + + int handle = handles.assign(obj); + + for (int i = 0; i < mtds.size(); i++) { + Method mtd = mtds.get(i); + + if (mtd != null) { + curObj = obj; + curFields = fields.fieldOffs(i); + curFieldInfoList = fields.fieldInfoList(i); + curFieldInfoMap = fields.fieldInfoMap(i); + + try { + mtd.invoke(obj, this); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IOException(e); + } + } + else + readFields(obj, fields.fieldOffs(i)); + } + + if (readResolveMtd != null) { + try { + obj = readResolveMtd.invoke(obj); + + handles.set(handle, obj); + } + catch (IllegalAccessException | InvocationTargetException e) { + throw new IOException(e); + } + } + + return obj; + } + + /** + * Reads {@link ArrayList}. + * + * @return List. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + ArrayList<?> readArrayList() throws ClassNotFoundException, IOException { + int size = readInt(); + + ArrayList<Object> list = new ArrayList<>(size); + + handles.assign(list); + + for (int i = 0; i < size; i++) + list.add(readObject()); + + return list; + } + + /** + * Reads {@link HashMap}. + * + * @param set Whether reading underlying map from {@link HashSet}. + * @return Map. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + HashMap<?, ?> readHashMap(boolean set) throws ClassNotFoundException, IOException { + int size = readInt(); + float loadFactor = readFloat(); + + HashMap<Object, Object> map = new HashMap<>(size, loadFactor); + + if (!set) + handles.assign(map); + + for (int i = 0; i < size; i++) { + Object key = readObject(); + Object val = !set ? readObject() : DUMMY; + + map.put(key, val); + } + + return map; + } + + /** + * Reads {@link HashSet}. + * + * @param mapFieldOff Map field offset. + * @return Set. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + HashSet<?> readHashSet(long mapFieldOff) throws ClassNotFoundException, IOException { + try { + HashSet<Object> set = (HashSet<Object>)UNSAFE.allocateInstance(HashSet.class); + + handles.assign(set); + + setObject(set, mapFieldOff, readHashMap(true)); + + return set; + } + catch (InstantiationException e) { + throw new IOException(e); + } + } + + /** + * Reads {@link LinkedList}. + * + * @return List. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + LinkedList<?> readLinkedList() throws ClassNotFoundException, IOException { + int size = readInt(); + + LinkedList<Object> list = new LinkedList<>(); + + handles.assign(list); + + for (int i = 0; i < size; i++) + list.add(readObject()); + + return list; + } + + /** + * Reads {@link LinkedHashMap}. + * + * @param set Whether reading underlying map from {@link LinkedHashSet}. + * @return Map. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + LinkedHashMap<?, ?> readLinkedHashMap(boolean set) throws ClassNotFoundException, IOException { + int size = readInt(); + float loadFactor = readFloat(); + boolean accessOrder = readBoolean(); + + LinkedHashMap<Object, Object> map = new LinkedHashMap<>(size, loadFactor, accessOrder); + + if (!set) + handles.assign(map); + + for (int i = 0; i < size; i++) { + Object key = readObject(); + Object val = !set ? readObject() : DUMMY; + + map.put(key, val); + } + + return map; + } + + /** + * Reads {@link LinkedHashSet}. + * + * @param mapFieldOff Map field offset. + * @return Set. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + LinkedHashSet<?> readLinkedHashSet(long mapFieldOff) throws ClassNotFoundException, IOException { + try { + LinkedHashSet<Object> set = (LinkedHashSet<Object>)UNSAFE.allocateInstance(LinkedHashSet.class); + + handles.assign(set); + + setObject(set, mapFieldOff, readLinkedHashMap(true)); + + return set; + } + catch (InstantiationException e) { + throw new IOException(e); + } + } + + /** + * Reads {@link Date}. + * + * @return Date. + * @throws ClassNotFoundException If class not found. + * @throws IOException In case of error. + */ + Date readDate() throws ClassNotFoundException, IOException { + Date date = new Date(readLong()); + + handles.assign(date); + + return date; + } + + /** + * Reads array of {@code byte}s. + * + * @return Array. + * @throws IOException In case of error. + */ + byte[] readByteArray() throws IOException { + byte[] arr = in.readByteArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code short}s. + * + * @return Array. + * @throws IOException In case of error. + */ + short[] readShortArray() throws IOException { + short[] arr = in.readShortArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code int}s. + * + * @return Array. + * @throws IOException In case of error. + */ + int[] readIntArray() throws IOException { + int[] arr = in.readIntArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code long}s. + * + * @return Array. + * @throws IOException In case of error. + */ + long[] readLongArray() throws IOException { + long[] arr = in.readLongArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code float}s. + * + * @return Array. + * @throws IOException In case of error. + */ + float[] readFloatArray() throws IOException { + float[] arr = in.readFloatArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code double}s. + * + * @return Array. + * @throws IOException In case of error. + */ + double[] readDoubleArray() throws IOException { + double[] arr = in.readDoubleArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code char}s. + * + * @return Array. + * @throws IOException In case of error. + */ + char[] readCharArray() throws IOException { + char[] arr = in.readCharArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads array of {@code boolean}s. + * + * @return Array. + * @throws IOException In case of error. + */ + boolean[] readBooleanArray() throws IOException { + boolean[] arr = in.readBooleanArray(); + + handles.assign(arr); + + return arr; + } + + /** + * Reads {@link String}. + * + * @return String. + * @throws IOException In case of error. + */ + public String readString() throws IOException { + String str = in.readUTF(); + + handles.assign(str); + + return str; + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b) throws IOException { + in.readFully(b); + } + + /** {@inheritDoc} */ + @Override public void readFully(byte[] b, int off, int len) throws IOException { + in.readFully(b, off, len); + } + + /** {@inheritDoc} */ + @Override public int skipBytes(int n) throws IOException { + return in.skipBytes(n); + } + + /** {@inheritDoc} */ + @Override public boolean readBoolean() throws IOException { + return in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public byte readByte() throws IOException { + return in.readByte(); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedByte() throws IOException { + return in.readUnsignedByte(); + } + + /** {@inheritDoc} */ + @Override public short readShort() throws IOException { + return in.readShort(); + } + + /** {@inheritDoc} */ + @Override public int readUnsignedShort() throws IOException { + return in.readUnsignedShort(); + } + + /** {@inheritDoc} */ + @Override public char readChar() throws IOException { + return in.readChar(); + } + + /** {@inheritDoc} */ + @Override public int readInt() throws IOException { + return in.readInt(); + } + + /** {@inheritDoc} */ + @Override public long readLong() throws IOException { + return in.readLong(); + } + + /** {@inheritDoc} */ + @Override public float readFloat() throws IOException { + return in.readFloat(); + } + + /** {@inheritDoc} */ + @Override public double readDouble() throws IOException { + return in.readDouble(); + } + + /** {@inheritDoc} */ + @Override public int read() throws IOException { + return in.read(); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b) throws IOException { + return in.read(b); + } + + /** {@inheritDoc} */ + @Override public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public String readLine() throws IOException { + return in.readLine(); + } + + /** {@inheritDoc} */ + @Override public String readUTF() throws IOException { + return in.readUTF(); + } + + /** {@inheritDoc} */ + @Override public Object readUnshared() throws IOException, ClassNotFoundException { + return readObject(); + } + + /** {@inheritDoc} */ + @Override public void defaultReadObject() throws IOException, ClassNotFoundException { + if (curObj == null) + throw new NotActiveException("Not in readObject() call."); + + readFields(curObj, curFields); + } + + /** {@inheritDoc} */ + @Override public ObjectInputStream.GetField readFields() throws IOException, ClassNotFoundException { + if (curObj == null) + throw new NotActiveException("Not in readObject() call."); + + return new GetFieldImpl(this); + } + + /** {@inheritDoc} */ + @Override public void registerValidation(ObjectInputValidation obj, int pri) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int available() throws IOException { + return -1; + } + + /** + * Returns objects that were added to handles table. + * Used ONLY for test purposes. + * + * @return Handled objects. + */ + Object[] handledObjects() { + return handles.entries; + } + + /** + * Lightweight identity hash table which maps objects to integer handles, + * assigned in ascending order. + */ + private static class HandleTable { + /** Array mapping handle -> object/exception (depending on status). */ + private Object[] entries; + + /** Number of handles in table. */ + private int size; + + /** + * Creates handle table with the given initial capacity. + * + * @param initCap Initial capacity. + */ + HandleTable(int initCap) { + entries = new Object[initCap]; + } + + /** + * Assigns next available handle to given object, and returns assigned + * handle. + * + * @param obj Object. + * @return Handle. + */ + int assign(Object obj) { + if (size >= entries.length) + grow(); + + entries[size] = obj; + + return size++; + } + + /** + * Assigns new object to existing handle. Old object is forgotten. + * + * @param handle Handle. + * @param obj Object. + */ + void set(int handle, Object obj) { + entries[handle] = obj; + } + + /** + * Looks up and returns object associated with the given handle. + * + * @param handle Handle. + * @return Object. + */ + Object lookup(int handle) { + return entries[handle]; + } + + /** + * Resets table to its initial state. + */ + void clear() { + Arrays.fill(entries, 0, size, null); + + size = 0; + } + + /** + * Expands capacity of internal arrays. + */ + private void grow() { + int newCap = (entries.length << 1) + 1; + + Object[] newEntries = new Object[newCap]; + + System.arraycopy(entries, 0, newEntries, 0, size); + + entries = newEntries; + } + } + + /** + * {@link GetField} implementation. + */ + private static class GetFieldImpl extends GetField { + /** Field info map. */ + private final Map<String, IgniteBiTuple<Integer, OptimizedFieldType>> fieldInfoMap; + + /** Values. */ + private final Object[] objs; + + /** + * @param in Stream. + * @throws IOException In case of error. + * @throws ClassNotFoundException If class not found. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private GetFieldImpl(OptimizedObjectInputStream in) throws IOException, ClassNotFoundException { + fieldInfoMap = in.curFieldInfoMap; + + List<IgniteBiTuple<Integer, OptimizedFieldType>> infos = in.curFieldInfoList; + + objs = new Object[infos.size()]; + + for (int i = 0; i < infos.size(); i++) { + IgniteBiTuple<Integer, OptimizedFieldType> t = infos.get(i); + + Object obj = null; + + switch (t.get2()) { + case BYTE: + obj = in.readByte(); + + break; + + case SHORT: + obj = in.readShort(); + + break; + + case INT: + obj = in.readInt(); + + break; + + case LONG: + obj = in.readLong(); + + break; + + case FLOAT: + obj = in.readFloat(); + + break; + + case DOUBLE: + obj = in.readDouble(); + + break; + + case CHAR: + obj = in.readChar(); + + break; + + case BOOLEAN: + obj = in.readBoolean(); + + break; + + case OTHER: + obj = in.readObject(); + } + + objs[t.get1()] = obj; + } + } + + /** {@inheritDoc} */ + @Override public ObjectStreamClass getObjectStreamClass() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean defaulted(String name) throws IOException { + return objs[fieldInfoMap.get(name).get1()] == null; + } + + /** {@inheritDoc} */ + @Override public boolean get(String name, boolean dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public byte get(String name, byte dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public char get(String name, char dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public short get(String name, short dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public int get(String name, int dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public long get(String name, long dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public float get(String name, float dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public double get(String name, double dflt) throws IOException { + return value(name, dflt); + } + + /** {@inheritDoc} */ + @Override public Object get(String name, Object dflt) throws IOException { + return value(name, dflt); + } + + /** + * @param name Field name. + * @param dflt Default value. + * @return Value. + */ + private <T> T value(String name, T dflt) { + return objs[fieldInfoMap.get(name).get1()] != null ? (T)objs[fieldInfoMap.get(name).get1()] : dflt; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/main/java/org/apache/ignite/messaging/MessagingListenActor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java index 1c28403,69f6c3e..4e817a3 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java @@@ -581,9 -582,9 +581,9 @@@ public class GridJobMasterLeaveAwareSel /** * Master leave aware callable. */ - private static class TestCallable implements Callable<Void>, ComputeJobMasterLeaveAware { + private static class TestCallable implements IgniteCallable<Void>, ComputeJobMasterLeaveAware { /** Task session. */ - @IgniteLoggerResource + @LoggerResource private IgniteLogger log; /** */ @@@ -605,9 -606,9 +605,9 @@@ /** * Master leave aware runnable. */ - private static class TestRunnable implements Runnable, ComputeJobMasterLeaveAware { + private static class TestRunnable implements IgniteRunnable, ComputeJobMasterLeaveAware { /** Task session. */ - @IgniteLoggerResource + @LoggerResource private IgniteLogger log; /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java index ffa9579,7ca423b..eb9914f --- a/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridManagementJobSelfTest.java @@@ -134,9 -135,9 +135,9 @@@ public class GridManagementJobSelfTest * Test job which ensures that its executor thread is from management pool in case * task name corresponds to either internal or Visor task. */ - private static class TestJob implements Callable<Object>, Serializable { + private static class TestJob implements IgniteCallable<Object> { /** Task session. */ - @IgniteTaskSessionResource + @TaskSessionResource protected ComputeTaskSession ses; /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/GridProjectionAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionContextSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeEmptyClusterGroupTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearEvictionSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/internal/processors/closure/GridClosureProcessorSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/marshaller/GridMarshallerAbstractTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteNodeCallableImpl.java ---------------------------------------------------------------------- diff --cc modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteNodeCallableImpl.java index 0000000,9d7d091..d48ef6d mode 000000,100644..100644 --- a/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteNodeCallableImpl.java +++ b/modules/ssh/src/main/java/org/apache/ignite/internal/util/nodestart/IgniteNodeCallableImpl.java @@@ -1,0 -1,344 +1,344 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util.nodestart; + + import com.jcraft.jsch.*; + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.resources.*; + + import java.io.*; + import java.text.*; + import java.util.*; + + import static org.apache.ignite.IgniteSystemProperties.*; + + /** + * SSH-based node starter. + */ + public class IgniteNodeCallableImpl implements IgniteNodeCallable { + /** Default Ignite home path for Windows (taken from environment variable). */ + private static final String DFLT_IGNITE_HOME_WIN = "%IGNITE_HOME%"; + + /** Default Ignite home path for Linux (taken from environment variable). */ + private static final String DFLT_IGNITE_HOME_LINUX = "$IGNITE_HOME"; + + /** Default start script path for Windows. */ + private static final String DFLT_SCRIPT_WIN = "bin\\ignite.bat -v -np"; + + /** Default start script path for Linux. */ + private static final String DFLT_SCRIPT_LINUX = "bin/ignite.sh -v"; + + /** + * Logs folder for Windows. + * Folder for linux is configured in {@code ignite-log4j.xml}. + */ + private static final String LOG_DIR_WIN = "work\\log"; + + /** Windows service executable. */ + private static final String SVC_EXE = "bin\\include\\igniteservice.exe"; + + /** Date format for log file name. */ + private static final SimpleDateFormat FILE_NAME_DATE_FORMAT = new SimpleDateFormat("MM-dd-yyyy--HH-mm-ss"); + + /** Specification. */ + private final IgniteRemoteStartSpecification spec; + + /** Connection timeout. */ + private final int timeout; + + /** Logger. */ - @IgniteLoggerResource ++ @LoggerResource + private IgniteLogger log; + + /** + * Required by Externalizable. + */ + public IgniteNodeCallableImpl() { + spec = null; + timeout = 0; + + assert false; + } + + /** + * Constructor. + * + * @param spec Specification. + * @param timeout Connection timeout. + */ + public IgniteNodeCallableImpl(IgniteRemoteStartSpecification spec, int timeout) { + assert spec != null; + + this.spec = spec; + this.timeout = timeout; + } + + /** {@inheritDoc} */ + @Override public GridTuple3<String, Boolean, String> call() { + JSch ssh = new JSch(); + + Session ses = null; + + try { + if (spec.key() != null) + ssh.addIdentity(spec.key().getAbsolutePath()); + + ses = ssh.getSession(spec.username(), spec.host(), spec.port()); + + if (spec.password() != null) + ses.setPassword(spec.password()); + + ses.setConfig("StrictHostKeyChecking", "no"); + + ses.connect(timeout); + + boolean win = isWindows(ses); + + char separator = win ? '\\' : '/'; + + spec.fixPaths(separator); + + String igniteHome = spec.igniteHome(); + + if (igniteHome == null) + igniteHome = win ? DFLT_IGNITE_HOME_WIN : DFLT_IGNITE_HOME_LINUX; + + String script = spec.script(); + + if (script == null) + script = win ? DFLT_SCRIPT_WIN : DFLT_SCRIPT_LINUX; + + String cfg = spec.configuration(); + + if (cfg == null) + cfg = ""; + + String startNodeCmd; + String scriptOutputFileName = FILE_NAME_DATE_FORMAT.format(new Date()) + '-' + + UUID.randomUUID().toString().substring(0, 8) + ".log"; + + if (win) { + String logDir = igniteHome + '\\' + LOG_DIR_WIN; + String tmpDir = env(ses, "%TMP%", logDir); + String scriptOutputDir = tmpDir + "\\ignite-startNodes"; + + shell(ses, "mkdir " + logDir); + shell(ses, "mkdir " + scriptOutputDir); + + UUID id = UUID.randomUUID(); + + String svcName = "Ignite-" + id; + String svcPath = igniteHome + '\\' + SVC_EXE; + + startNodeCmd = new SB(). + a("cmd /c if exist \"").a(svcPath).a("\""). + a(" sc create ").a(svcName). + a(" binPath= \"").a(svcPath).a("\""). + a(" && "). + a("sc start ").a(svcName). + a(" ").a(svcName). + a(" \"").a(igniteHome).a('\\').a(script). + a(" ").a(cfg).a("\""). + a(" \"").a(logDir).a("\\ignite.").a(id). + a(".log\" > ").a(scriptOutputDir).a("\\").a(scriptOutputFileName). + toString(); + } + else { // Assume Unix. + int spaceIdx = script.indexOf(' '); + + String scriptPath = spaceIdx > -1 ? script.substring(0, spaceIdx) : script; + String scriptArgs = spaceIdx > -1 ? script.substring(spaceIdx + 1) : ""; + String rmtLogArgs = buildRemoteLogArguments(spec.username(), spec.host()); + String tmpDir = env(ses, "$TMPDIR", "/tmp/"); + String scriptOutputDir = tmpDir + "ignite-startNodes"; + + shell(ses, "mkdir " + scriptOutputDir); + + // Mac os don't support ~ in double quotes. Trying get home path from remote system. + if (igniteHome.startsWith("~")) { + String homeDir = env(ses, "$HOME", "~"); + + igniteHome = igniteHome.replaceFirst("~", homeDir); + } + + startNodeCmd = new SB(). + // Console output is consumed, started nodes must use Ignite file appenders for log. + a("nohup "). + a("\"").a(igniteHome).a('/').a(scriptPath).a("\""). + a(" ").a(scriptArgs). + a(!cfg.isEmpty() ? " \"" : "").a(cfg).a(!cfg.isEmpty() ? "\"" : ""). + a(rmtLogArgs). + a(" > ").a(scriptOutputDir).a("/").a(scriptOutputFileName).a(" 2>& 1 &"). + toString(); + } + + info("Starting remote node with SSH command: " + startNodeCmd, spec.logger(), log); + + shell(ses, startNodeCmd); + + return new GridTuple3<>(spec.host(), true, null); + } + catch (IgniteInterruptedCheckedException e) { + return new GridTuple3<>(spec.host(), false, e.getMessage()); + } + catch (Exception e) { + return new GridTuple3<>(spec.host(), false, X.getFullStackTrace(e)); + } + finally { + if (ses != null && ses.isConnected()) + ses.disconnect(); + } + } + + /** + * Executes command using {@code shell} channel. + * + * @param ses SSH session. + * @param cmd Command. + * @throws JSchException In case of SSH error. + * @throws IOException If IO error occurs. + * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread was interrupted while waiting. + */ + private void shell(Session ses, String cmd) throws JSchException, IOException, IgniteInterruptedCheckedException { + ChannelShell ch = null; + + try { + ch = (ChannelShell)ses.openChannel("shell"); + + ch.connect(); + + try (PrintStream out = new PrintStream(ch.getOutputStream(), true)) { + out.println(cmd); + + U.sleep(1000); + } + } + finally { + if (ch != null && ch.isConnected()) + ch.disconnect(); + } + } + + /** + * Checks whether host is running Windows OS. + * + * @param ses SSH session. + * @return Whether host is running Windows OS. + * @throws JSchException In case of SSH error. + */ + private boolean isWindows(Session ses) throws JSchException { + try { + return exec(ses, "cmd.exe") != null; + } + catch (IOException ignored) { + return false; + } + } + + /** + * Gets the value of the specified environment variable. + * + * @param ses SSH session. + * @param name environment variable name. + * @param dflt default value. + * @return environment variable value. + * @throws JSchException In case of SSH error. + */ + private String env(Session ses, String name, String dflt) throws JSchException { + try { + return exec(ses, "echo " + name); + } + catch (IOException ignored) { + return dflt; + } + } + + /** + * Gets the value of the specified environment variable. + * + * @param ses SSH session. + * @param cmd environment variable name. + * @return environment variable value. + * @throws JSchException In case of SSH error. + * @throws IOException If failed. + */ + private String exec(Session ses, String cmd) throws JSchException, IOException { + ChannelExec ch = null; + + try { + ch = (ChannelExec)ses.openChannel("exec"); + + ch.setCommand(cmd); + + ch.connect(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(ch.getInputStream()))) { + return reader.readLine(); + } + } + finally { + if (ch != null && ch.isConnected()) + ch.disconnect(); + } + } + + /** + * Builds ignite.sh attributes to set up SSH username and password and log directory for started node. + * + * @param username SSH user name. + * @param host Host. + * @return {@code ignite.sh} script arguments. + */ + private String buildRemoteLogArguments(String username, String host) { + assert username != null; + assert host != null; + + SB sb = new SB(); + + sb.a(" -J-D").a(IGNITE_SSH_HOST).a("=\"").a(host).a("\""). + a(" -J-D").a(IGNITE_SSH_USER_NAME).a("=\"").a(username).a("\""); + + return sb.toString(); + } + + /** + * @param log Logger. + * @return This callable for chaining method calls. + */ + public IgniteNodeCallable setLogger(IgniteLogger log) { + this.log = log; + + return this; + } + + /** + * Log info message to loggers. + * + * @param msg Message text. + * @param loggers Loggers. + */ + private void info(String msg, IgniteLogger... loggers) { + for (IgniteLogger logger : loggers) + if (logger != null && logger.isInfoEnabled()) + logger.info(msg); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java ---------------------------------------------------------------------- diff --cc modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java index 0000000,0e9ee8a..f424c0d mode 000000,100644..100644 --- a/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java +++ b/modules/ssh/src/test/java/org/apache/ignite/internal/IgniteProjectionStartStopRestartSelfTest.java @@@ -1,0 -1,1032 +1,1032 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.nodestart.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.file.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*; ++import static org.apache.ignite.events.EventType.*; ++import static org.apache.ignite.internal.util.nodestart.GridNodeStartUtils.*; + + /** + * Tests for {@code startNodes(..)}, {@code stopNodes(..)} + * and {@code restartNodes(..)} methods. + * <p> + * {@code tests.properties} file must specify username ({@code ssh.username} property) + * and one (and only one) of password ({@code ssh.password} property) or + * private key path ({@code ssh.key} property). + */ + @SuppressWarnings("ConstantConditions") + public class IgniteProjectionStartStopRestartSelfTest extends GridCommonAbstractTest { + /** */ + private static final String SSH_UNAME = System.getenv("test.ssh.username"); + + /** */ + private static final String SSH_PWD = System.getenv("test.ssh.password"); + + /** */ + private static final String SSH_KEY = System.getenv("ssh.key"); + + /** */ + private static final String CUSTOM_SCRIPT_WIN = "modules/core/src/test/bin/start-nodes-custom.bat"; + + /** */ + private static final String CUSTOM_SCRIPT_LINUX = "modules/core/src/test/bin/start-nodes-custom.sh"; + + /** */ + private static final String CFG_NO_ATTR = "modules/core/src/test/config/spring-start-nodes.xml"; + + /** */ + private static final String CFG_ATTR = "modules/core/src/test/config/spring-start-nodes-attr.xml"; + + /** */ + private static final String CUSTOM_CFG_ATTR_KEY = "grid.node.ssh.started"; + + /** */ + private static final String CUSTOM_CFG_ATTR_VAL = "true"; + + /** */ + private static final long WAIT_TIMEOUT = 40 * 1000; + + /** */ + private String pwd; + + /** */ + private File key; + + /** */ + private Ignite ignite; + + /** */ + private static final String HOST = "127.0.0.1"; + + /** */ + private final AtomicInteger joinedCnt = new AtomicInteger(); + + /** */ + private final AtomicInteger leftCnt = new AtomicInteger(); + + /** */ + private volatile CountDownLatch joinedLatch; + + /** */ + private volatile CountDownLatch leftLatch; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + if (SSH_KEY != null) { + key = new File(SSH_KEY); + + assert key.exists() : "Private key doesn't exist: " + key.getAbsolutePath(); + assert key.isFile() : "Private key is not a file: " + key.getAbsolutePath(); + } + else + pwd = SSH_PWD; + + log.info("Username: " + SSH_UNAME); + log.info("Password: " + pwd); + log.info("Key path: " + key); + + G.setDaemon(true); + + ignite = G.start(CFG_NO_ATTR); + + G.setDaemon(false); + - ignite.events().localListen(new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { ++ ignite.events().localListen(new IgnitePredicate<Event>() { ++ @Override public boolean apply(Event evt) { + info("Received event: " + evt.shortDisplay()); + + if (evt.type() == EVT_NODE_JOINED) { + joinedCnt.incrementAndGet(); + + if (joinedLatch != null) + joinedLatch.countDown(); + } else if (evt.type() == EVT_NODE_LEFT) { + leftCnt.incrementAndGet(); + + if (leftLatch != null) + leftLatch.countDown(); + } + + return true; + } + }, EVT_NODE_JOINED, EVT_NODE_LEFT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + if (!ignite.cluster().nodes().isEmpty()) { + leftLatch = new CountDownLatch(ignite.cluster().nodes().size()); + + ignite.cluster().stopNodes(); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + } + + boolean wasEmpty = ignite.cluster().nodes().isEmpty(); + + G.stop(true); + + joinedCnt.set(0); + leftCnt.set(0); + + joinedLatch = null; + leftLatch = null; + + assert wasEmpty : "grid.isEmpty() returned false after all nodes were stopped [nodes=" + ignite.cluster().nodes() + ']'; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 90 * 1000; + } + + /** + * @throws Exception If failed. + */ + public void testStartOneNode() throws Exception { + joinedLatch = new CountDownLatch(1); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 1, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 1; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 1; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 1; + } + + /** + * @throws Exception If failed. + */ + public void testStartThreeNodes() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, DFLT_TIMEOUT, 1); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testStartThreeNodesAndDoEmptyCall() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.isEmpty(); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testStartThreeNodesAndTryToStartOneNode() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 1, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.isEmpty(); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testStartFiveNodesInTwoCalls() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(2); + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 5, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 2; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 5; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 5; + } + + /** + * @throws Exception If failed. + */ + public void testStartFiveWithTwoSpecs() throws Exception { + joinedLatch = new CountDownLatch(5); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + F.asList(map(HOST, SSH_UNAME, pwd, key, 2, U.getGridGainHome(), CFG_NO_ATTR, null), + map(HOST, SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null)), + null, false, 0, 16); + + assert res.size() == 5; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 5; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 5; + } + + /** + * @throws Exception If failed. + */ + public void testStartThreeNodesAndRestart() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 3; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(3); + leftLatch = new CountDownLatch(3); + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, true, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 6; + assert leftCnt.get() == 3; + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testCustomScript() throws Exception { + joinedLatch = new CountDownLatch(1); + + String script = U.isWindows() ? CUSTOM_SCRIPT_WIN : CUSTOM_SCRIPT_LINUX; + + script = Paths.get(U.getGridGainHome()).relativize(U.resolveGridGainPath(script).toPath()).toString(); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 1, U.getGridGainHome(), null, script), + null, false, 0, 16); + + assert res.size() == 1; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert joinedCnt.get() == 1; + assert leftCnt.get() == 0; + + assert ignite.cluster().nodes().size() == 1; + + assert CUSTOM_CFG_ATTR_VAL.equals(F.first(ignite.cluster().nodes()).<String>attribute(CUSTOM_CFG_ATTR_KEY)); + } + + /** + * @throws Exception If failed. + */ + public void testStopNodes() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, null, 3, U.getGridGainHome(), CFG_NO_ATTR, + null), null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + leftLatch = new CountDownLatch(3); + + ignite.cluster().stopNodes(); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testStopNodesFiltered() throws Exception { + joinedLatch = new CountDownLatch(2); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 2, U.getGridGainHome(), CFG_ATTR, null), + null, false, 0, 16); + + assert res.size() == 2; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + joinedLatch = new CountDownLatch(1); + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 1; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + leftLatch = new CountDownLatch(2); + + Collection<UUID> ids = F.transform(ignite.cluster().forAttribute(CUSTOM_CFG_ATTR_KEY, CUSTOM_CFG_ATTR_VAL).nodes(), + new IgniteClosure<ClusterNode, UUID>() { + @Override public UUID apply(ClusterNode node) { + return node.id(); + } + }); + + ignite.cluster().forAttribute(CUSTOM_CFG_ATTR_KEY, CUSTOM_CFG_ATTR_VAL).nodes(); + + ignite.cluster().stopNodes(ids); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 1; + } + + /** + * @throws Exception If failed. + */ + public void testStopNodeById() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + leftLatch = new CountDownLatch(1); + + ignite.cluster().stopNodes(Collections.singleton(F.first(ignite.cluster().forRemotes().nodes()).id())); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 2; + } + + /** + * @throws Exception If failed. + */ + public void testStopNodesByIds() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + leftLatch = new CountDownLatch(2); + + Iterator<ClusterNode> it = ignite.cluster().nodes().iterator(); + + Collection<UUID> ids = new HashSet<>(); + + ids.add(it.next().id()); + ids.add(it.next().id()); + + ignite.cluster().stopNodes(ids); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 1; + } + + /** + * @throws Exception If failed. + */ + public void testStopNodesByIdsC() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + leftLatch = new CountDownLatch(2); + + Iterator<ClusterNode> it = ignite.cluster().nodes().iterator(); + + ignite.cluster().stopNodes(F.asList(it.next().id(), it.next().id())); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 1; + } + + /** + * @throws Exception If failed. + */ + public void testRestartNodes() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(3); + leftLatch = new CountDownLatch(3); + + ignite.cluster().restartNodes(); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testRestartNodesFiltered() throws Exception { + joinedLatch = new CountDownLatch(2); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 2, U.getGridGainHome(), CFG_ATTR, null), + null, false, 0, 16); + + assert res.size() == 2; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + joinedLatch = new CountDownLatch(1); + + res = startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 1; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(2); + leftLatch = new CountDownLatch(2); + + X.println("Restarting nodes with " + CUSTOM_CFG_ATTR_KEY); + + Collection<UUID> ids = F.transform(ignite.cluster().forAttribute(CUSTOM_CFG_ATTR_KEY, CUSTOM_CFG_ATTR_VAL).nodes(), + new IgniteClosure<ClusterNode, UUID>() { + @Override public UUID apply(ClusterNode node) { + return node.id(); + } + } + ); + + ignite.cluster().restartNodes(ids); + + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testRestartNodeById() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(1); + leftLatch = new CountDownLatch(1); + + ignite.cluster().restartNodes(Collections.singleton(F.first(ignite.cluster().forRemotes().nodes()).id())); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testRestartNodesByIds() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(2); + leftLatch = new CountDownLatch(2); + + Iterator<ClusterNode> it = ignite.cluster().nodes().iterator(); + + ignite.cluster().restartNodes(F.asList(it.next().id(), it.next().id())); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @throws Exception If failed. + */ + public void testRestartNodesByIdsC() throws Exception { + joinedLatch = new CountDownLatch(3); + + Collection<GridTuple3<String, Boolean, String>> res = + startNodes(ignite.cluster(), + maps(Collections.singleton(HOST), SSH_UNAME, pwd, key, 3, U.getGridGainHome(), CFG_NO_ATTR, null), + null, false, 0, 16); + + assert res.size() == 3; + + F.forEach(res, new CI1<GridTuple3<String, Boolean, String>>() { + @Override public void apply(GridTuple3<String, Boolean, String> t) { + assert t.get1().equals(HOST); + + if (!t.get2()) + throw new IgniteException(t.get3()); + } + }); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + + joinedLatch = new CountDownLatch(2); + leftLatch = new CountDownLatch(2); + + Iterator<ClusterNode> it = ignite.cluster().nodes().iterator(); + + ignite.cluster().restartNodes(F.asList(it.next().id(), it.next().id())); + + assert joinedLatch.await(WAIT_TIMEOUT, MILLISECONDS); + assert leftLatch.await(WAIT_TIMEOUT, MILLISECONDS); + + assert ignite.cluster().nodes().size() == 3; + } + + /** + * @param host Hostname. + * @param uname Username. + * @param passwd Password. + * @param key Private key file. + * @param nodes Number of nodes. + * @param igniteHome GridGain home. + * @param cfg Configuration file path. + * @param script Startup script path. + * @return Parameters map. + */ + private Map<String, Object> map( + String host, + @Nullable String uname, + @Nullable String passwd, + @Nullable File key, + @Nullable Integer nodes, + @Nullable String igniteHome, + @Nullable String cfg, + @Nullable String script) { + assert host != null; + + Map<String, Object> params = new HashMap<>(); + + params.put(IgniteNodeStartUtils.HOST, host); + params.put(UNAME, uname); + params.put(PASSWD, passwd); + params.put(KEY, key); + params.put(NODES, nodes); + params.put(IGNITE_HOME, igniteHome); + params.put(CFG, cfg); + params.put(SCRIPT, script); + + return params; + } + + /** + * @param hosts Hostnames. + * @param uname Username. + * @param passwd Password. + * @param key Private key file. + * @param nodes Number of nodes. + * @param igniteHome GridGain home. + * @param cfg Configuration file path. + * @param script Startup script path. + * @return Parameters map. + */ + private Collection<Map<String, Object>> maps( + Collection<String> hosts, + @Nullable String uname, + @Nullable String passwd, + @Nullable File key, + @Nullable Integer nodes, + @Nullable String igniteHome, + @Nullable String cfg, + @Nullable String script) { + assert HOST != null; + + Collection<Map<String, Object>> maps = new ArrayList<>(hosts.size()); + + for (String host : hosts) { + Map<String, Object> params = new HashMap<>(); + + params.put(IgniteNodeStartUtils.HOST, host); + params.put(UNAME, uname); + params.put(PASSWD, passwd); + params.put(KEY, key); + params.put(NODES, nodes); + params.put(IGNITE_HOME, igniteHome); + params.put(CFG, cfg); + params.put(SCRIPT, script); + + maps.add(params); + } + + return maps; + } + + /** + * @param name Filename. + * @return Whether name belongs to log file. + */ + private boolean isSshNodeLogName(String name) { + return name.matches("gridgain.[0-9a-z-]+.log"); + } + + /** + * @param cluster Cluster. + * @param hosts Hosts. + * @param dflts Default. + * @param restart Restart flag. + * @param timeout Timeout. + * @param maxConn Maximum connections. + * @return Results collection. + * @throws IgniteCheckedException If failed. + */ + private Collection<GridTuple3<String, Boolean, String>> startNodes(IgniteCluster cluster, + Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, + boolean restart, + int timeout, + int maxConn) throws IgniteCheckedException { + cluster = cluster.withAsync(); + + assertNull(cluster.startNodes(hosts, dflts, restart, timeout, maxConn)); + + return cluster.<Collection<GridTuple3<String, Boolean, String>>>future().get(WAIT_TIMEOUT); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/33834a34/modules/yardstick/config/ignite-store-config.xml ----------------------------------------------------------------------