This is an automated email from the ASF dual-hosted git repository. nizhikov pushed a commit to branch IGNITE-19950-snapshot-merge in repository https://gitbox.apache.org/repos/asf/ignite.git
commit e698ed680093373bc913ded52fab4c6e5b197298 Author: nizhikov <[email protected]> AuthorDate: Thu Jul 27 14:04:24 2023 +0300 IGNITE-19950 WIP --- .../IgniteDataTransferObjectSerDesGenerator.java | 696 +++++++++++++++++++++ .../java/org/apache/ignite/IgniteSnapshot.java | 7 + .../org/apache/ignite/internal/IgniteKernal.java | 2 +- .../internal/management/IgniteCommandRegistry.java | 4 +- .../internal/management/dump/DumpCommand.java | 28 + .../management/dump/DumpCreateCommand.java | 40 ++ .../management/dump/DumpCreateCommandArg.java | 71 +++ .../internal/management/dump/DumpCreateTask.java | 47 ++ .../snapshot/AbstractSnapshotFutureTask.java | 2 +- .../snapshot/IgniteSnapshotManager.java | 321 ++++++++-- .../snapshot/SnapshotOperationRequest.java | 13 +- .../snapshot/SnapshotRestoreProcess.java | 3 +- .../cache/persistence/snapshot/SnapshotSender.java | 2 +- .../snapshot/dump/DumpCacheFutureTask.java | 120 ++++ .../persistence/snapshot/dump/DumpMetadata.java | 91 +++ .../internal/processors/pool/PoolProcessor.java | 22 +- .../snapshot/IgniteCacheDumpSelfTest.java | 74 +++ .../junits/common/GridCommonAbstractTest.java | 6 +- 18 files changed, 1466 insertions(+), 83 deletions(-) diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/IgniteDataTransferObjectSerDesGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/IgniteDataTransferObjectSerDesGenerator.java new file mode 100644 index 00000000000..da0870ed206 --- /dev/null +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/IgniteDataTransferObjectSerDesGenerator.java @@ -0,0 +1,696 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.codegen; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.management.IgniteCommandRegistry; +import org.apache.ignite.internal.management.api.Command; +import org.apache.ignite.internal.management.api.CommandsRegistry; +import org.apache.ignite.internal.management.api.ComputeCommand; +import org.apache.ignite.internal.management.api.LocalCommand; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; + +import static java.lang.reflect.Modifier.isStatic; +import static java.lang.reflect.Modifier.isTransient; +import static org.apache.ignite.codegen.MessageCodeGenerator.DFLT_SRC_DIR; +import static org.apache.ignite.codegen.MessageCodeGenerator.TAB; + +/** + * This class can generate boilerplate code for classes extends {@link IgniteDataTransferObject}. + * This class can generate {@code IgniteDataTransferObject#writeExternalData(ObjectOutput)} + * and {@code IgniteDataTransferObject#readExternalData(byte, ObjectInput)}. + */ +public class IgniteDataTransferObjectSerDesGenerator { + /** */ + public static final String METHOD_JAVADOC = TAB + "/** {@inheritDoc} */"; + + /** */ + public static final String IMPORT_TOKEN = "import "; + + /** */ + public static final String IMPORT_STATIC_TOKEN = "import static "; + + /** */ + public static final String WRITE_MTD = "writeExternalData"; + + /** */ + private int methodsStart = Integer.MAX_VALUE; + + /** */ + private final Set<String> imports = new TreeSet<>(); + + /** */ + private final Set<String> staticImports = new TreeSet<>(); + + /** */ + private static final Map<Class<?>, GridTuple3<Function<Field, String>, Function<Field, String>, Boolean>> TYPE_GENS + = new HashMap<>(); + + static { + TYPE_GENS.put(byte.class, F.t( + fld -> "out.writeByte(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readByte();", + false + )); + + TYPE_GENS.put(Byte.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Byte)in.readObject();", + false + )); + + TYPE_GENS.put(byte[].class, F.t( + fld -> "U.writeByteArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readByteArray(in);", + true + )); + + TYPE_GENS.put(short.class, F.t( + fld -> "out.writeShort(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readShort();", + false + )); + + TYPE_GENS.put(Short.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Short)in.readObject();", + false + )); + + TYPE_GENS.put(short[].class, F.t( + fld -> "U.writeShortArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readShortArray(in);", + true + )); + + TYPE_GENS.put(int.class, F.t( + fld -> "out.writeInt(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readInt();", + false + )); + + TYPE_GENS.put(Integer.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Integer)in.readObject();", + false + )); + + TYPE_GENS.put(int[].class, F.t( + fld -> "U.writeIntArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readIntArray(in);", + true + )); + + TYPE_GENS.put(long.class, F.t( + fld -> "out.writeLong(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readLong();", + false + )); + + TYPE_GENS.put(Long.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Long)in.readObject();", + false + )); + + TYPE_GENS.put(long[].class, F.t( + fld -> "U.writeLongArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readLongArray(in);", + true + )); + + TYPE_GENS.put(float.class, F.t( + fld -> "out.writeFloat(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readFloat();", + false + )); + + TYPE_GENS.put(Float.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Float)in.readObject();", + false + )); + + TYPE_GENS.put(float[].class, F.t( + fld -> "U.writeFloatArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readFloatArray(in);", + true + )); + + TYPE_GENS.put(double.class, F.t( + fld -> "out.writeDouble(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readDouble();", + false + )); + + TYPE_GENS.put(Double.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Double)in.readObject();", + false + )); + + TYPE_GENS.put(double[].class, F.t( + fld -> "U.writeDoubleArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readDoubleArray(in);", + true + )); + + TYPE_GENS.put(boolean.class, F.t( + fld -> "out.writeBoolean(" + outName(fld) + ");", + fld -> inName(fld) + " = in.readBoolean();", + false + )); + + TYPE_GENS.put(Boolean.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (Boolean)in.readObject();", + false + )); + + TYPE_GENS.put(boolean[].class, F.t( + fld -> "U.writeBooleanArray(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readBooleanArray(in);", + true + )); + + TYPE_GENS.put(String.class, F.t( + fld -> "U.writeString(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readString(in);", + true + )); + + TYPE_GENS.put(UUID.class, F.t( + fld -> "U.writeUuid(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readUuid(in);", + true + )); + + TYPE_GENS.put(IgniteUuid.class, F.t( + fld -> "U.writeIgniteUuid(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readIgniteUuid(in);", + true + )); + + TYPE_GENS.put(Collection.class, F.t( + fld -> "U.writeCollection(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readCollection(in);", + true + )); + + TYPE_GENS.put(List.class, F.t( + fld -> "U.writeCollection(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readList(in);", + true + )); + + TYPE_GENS.put(Set.class, F.t( + fld -> "U.writeCollection(out, " + outName(fld) + ");", + fld -> inName(fld) + " = U.readSet(in);", + true + )); + + TYPE_GENS.put(GridCacheVersion.class, F.t( + fld -> "out.writeObject(" + outName(fld) + ");", + fld -> inName(fld) + " = (GridCacheVersion)in.readObject();", + false + )); + + TYPE_GENS.put(Map.class, F.t( + fld -> "U.writeMap(out, " + outName(fld) + ");", + fld -> fld.getName() + " = U.readMap(in);", + false + )); + } + + /** @param args Command line arguments. */ + public static void main(String[] args) { + new IgniteDataTransferObjectSerDesGenerator().generate(new IgniteCommandRegistry()); + } + + /** */ + private void generate(Command<?, ?> cmd) { + boolean generate = true; + + if (cmd instanceof CommandsRegistry) { + generate = cmd instanceof ComputeCommand || cmd instanceof LocalCommand; + + ((CommandsRegistry<?, ?>)cmd).commands().forEachRemaining(entry -> generate(entry.getValue())); + } + + if (!generate) + return; + + try { + generateAndWrite(cmd.argClass(), DFLT_SRC_DIR); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** */ + private void generateAndWrite(Class<? extends IgniteDataTransferObject> cls, String srcDir) throws IOException { + clear(); + + if (cls.getName().indexOf('$') != -1) { + System.err.println("Inner class not supported: " + cls.getName()); + + return; + } + + File file = new File(srcDir, cls.getName().replace('.', File.separatorChar) + ".java"); + + if (!file.exists() || !file.isFile()) + throw new IllegalArgumentException("Source file not found: " + file.getPath()); + + List<String> src = + removeExisting(addSerialVersionUID(Files.readAllLines(file.toPath(), StandardCharsets.UTF_8))); + + List<List<String>> readWriteMethods = generateMethods(cls); + List<List<String>> getersSeters = generateGetSet(cls); + + List<String> code = new ArrayList<>(); + + int i = 0; + + // Outputs all before imports. + for (; i < src.size(); i++) { + code.add(src.get(i)); + + if (src.get(i).startsWith("package")) + break; + } + + code.add(""); + + i++; + + for (String imp : imports) + code.add("import " + imp); + + if (!staticImports.isEmpty()) { + code.add(""); + + for (String imp : staticImports) + code.add("import static " + imp); + } + + i++; + + if (methodsStart == Integer.MAX_VALUE) // If methods not exists add them to the end. + methodsStart = src.size() - 1; + + for (; i < methodsStart; i++) + code.add(src.get(i)); + + if (!readWriteMethods.get(0).isEmpty()) { + code.add(""); + code.addAll(readWriteMethods.get(0)); + } + + if (!readWriteMethods.get(1).isEmpty()) { + code.add(""); + code.addAll(readWriteMethods.get(1)); + } + + for (int j = 0; j < getersSeters.size(); j++) { + code.add(""); + code.addAll(getersSeters.get(j)); + } + + for (; i < src.size(); i++) + code.add(src.get(i)); + + try (FileWriter writer = new FileWriter(file)) { + for (String line : code) { + writer.write(line); + writer.write('\n'); + } + } + } + + /** */ + private List<List<String>> generateGetSet(Class<? extends IgniteDataTransferObject> cls) { + List<Field> flds = fields(cls); + + if (flds.isEmpty()) + return Collections.emptyList(); + + List<List<String>> getSet = new ArrayList<>(); + + for (Field fld : flds) { + if (!geterExists(cls, fld)) { + List<String> geter = new ArrayList<>(); + + geter.add(TAB + "/** */"); + geter.add(TAB + "public " + generic(fld.getGenericType()) + " " + fld.getName() + "() {"); + geter.add(TAB + TAB + "return " + fld.getName() + ";"); + geter.add(TAB + "}"); + + getSet.add(geter); + } + + if (!seterExists(cls, fld)) { + List<String> seter = new ArrayList<>(); + + seter.add(TAB + "/** */"); + seter.add(TAB + "public void " + fld.getName() + + "(" + generic(fld.getGenericType()) + " " + fld.getName() + ") {"); + seter.add(TAB + TAB + "this." + fld.getName() + " = " + fld.getName() + ";"); + seter.add(TAB + "}"); + + getSet.add(seter); + } + } + + return getSet; + } + + /** */ + private List<String> addSerialVersionUID(List<String> src) { + int classStart = -1; + + for (int i = 0; i < src.size(); i++) { + String line = src.get(i); + + if (line.contains("private static final long serialVersionUID = ")) + return src; + + if (line.startsWith("public class ")) + classStart = i; + } + + if (classStart == -1) + return src; + + List<String> res = new ArrayList<>(src.subList(0, classStart + 1)); + + res.add(TAB + "/** */"); + res.add(TAB + "private static final long serialVersionUID = 0;"); + res.add(""); + res.addAll(src.subList(classStart + 1, src.size())); + + return res; + } + + /** */ + private List<List<String>> generateMethods(Class<? extends IgniteDataTransferObject> cls) { + List<Field> flds = fields(cls); + + if (flds.isEmpty() && writeExternalDataMethodExists(cls.getSuperclass())) + return Arrays.asList(Collections.emptyList(), Collections.emptyList()); + + List<String> write = new ArrayList<>(); + List<String> read = new ArrayList<>(); + + write.add(METHOD_JAVADOC); + write.add(TAB + "@Override protected void " + WRITE_MTD + "(ObjectOutput out) throws IOException {"); + + read.add(METHOD_JAVADOC); + read.add(TAB + "@Override protected void readExternalData(byte protoVer, ObjectInput in) " + + "throws IOException, ClassNotFoundException {"); + + addImport(IOException.class); + addImport(ObjectOutput.class); + addImport(ObjectInput.class); + + if (flds.isEmpty()) { + write.add(TAB + TAB + "//No-op."); + read.add(TAB + TAB + "//No-op."); + } + else if (writeExternalDataMethodExists(cls.getSuperclass())) { + write.add(TAB + TAB + "super." + WRITE_MTD + "(out);"); + write.add(""); + + read.add(TAB + TAB + "super.readExternalData(protoVer, in);"); + read.add(""); + } + + for (Field fld : flds) { + String name = fld.getName(); + + if (Enum.class.isAssignableFrom(fld.getType())) { + addImport(U.class); + + write.add(TAB + TAB + "U.writeEnum(out, " + (name.equals("out") ? "this.out." : "") + name + ");"); + read.add(TAB + TAB + (name.equals("in") ? "this." : "") + name + + " = U.readEnum(in, " + fld.getType().getSimpleName() + ".class);"); + } + else if (fld.getType().isArray() && !fld.getType().getComponentType().isPrimitive()) { + addImport(U.class); + addImport(fld.getType().getComponentType()); + + write.add(TAB + TAB + "U.writeArray(out, " + (name.equals("out") ? "this.out." : "") + name + ");"); + read.add(TAB + TAB + (name.equals("in") ? "this." : "") + name + + " = U.readArray(in, " + fld.getType().getComponentType().getSimpleName() + ".class);"); + } + else { + GridTuple3<Function<Field, String>, Function<Field, String>, Boolean> gen + = TYPE_GENS.get(fld.getType()); + + if (gen == null) + throw new IllegalArgumentException(fld.getType() + " not supported[cls=" + cls.getName() + ']'); + + if (gen.get3()) + addImport(U.class); + + write.add(TAB + TAB + gen.get1().apply(fld)); + read.add(TAB + TAB + gen.get2().apply(fld)); + } + } + + write.add(TAB + "}"); + read.add(TAB + "}"); + + return Arrays.asList(write, read); + } + + /** */ + private static String generic(Type type0) { + if (!(type0 instanceof ParameterizedType)) + return simpleName(type0.getTypeName()); + + ParameterizedType type = (ParameterizedType)type0; + + Type[] typeArgs = type.getActualTypeArguments(); + + StringBuffer generic = new StringBuffer(simpleName(type.getRawType().getTypeName())); + + generic.append('<'); + + for (int i = 0; i < typeArgs.length; i++) { + if (i != 0) + generic.append(", "); + + generic.append(generic(typeArgs[i])); + } + + generic.append('>'); + + return generic.toString(); + } + + /** */ + private static String simpleName(String type) { + int idx = type.lastIndexOf('.'); + + if (idx == -1) + return type; + + return type.substring(idx + 1); + } + + /** */ + private static String outName(Field fld) { + return (fld.getName().equals("out") ? "this." : "") + fld.getName(); + } + + /** */ + private static String inName(Field fld) { + return (fld.getName().equals("in") ? "this." : "") + fld.getName(); + } + + /** */ + private static List<Field> fields(Class<? extends IgniteDataTransferObject> cls) { + List<Field> flds = Arrays.stream(cls.getDeclaredFields()) + .filter(fld -> { + int mod = fld.getModifiers(); + + return !isStatic(mod) && !isTransient(mod); + }) + .collect(Collectors.toList()); + return flds; + } + + /** */ + private List<String> removeExisting(List<String> src) { + return removeMethod( + removeMethod( + collectAndRemoveImports(src), + WRITE_MTD + ), + "readExternalData" + ); + } + + /** */ + private List<String> removeMethod(List<String> src, String methodName) { + int start = -1; + int finish = -1; + int bracketCnt = -1; + + for (int i = 0; i < src.size(); i++) { + String line = src.get(i); + + if (line.contains(methodName) && line.endsWith("{")) { + assert src.get(i - 1).equals(METHOD_JAVADOC); + + // One line for comment and one for empty line between methods. + start = i - 2; + bracketCnt = 1; + } + else if (start != -1) { + bracketCnt += counfOf(line, '{') - counfOf(line, '}'); + + if (bracketCnt < 0) + throw new IllegalStateException("Wrong brackets count"); + + if (bracketCnt == 0) { + finish = i; + break; + } + } + } + + if (start == -1 || finish == -1) + return src; + + methodsStart = start; + + List<String> res = new ArrayList<>(src.subList(0, start)); + + res.addAll(src.subList(finish + 1, src.size())); + + return res; + } + + /** */ + private List<String> collectAndRemoveImports(List<String> src) { + return src.stream() + .peek(line -> { + if (line.startsWith(IMPORT_STATIC_TOKEN)) + staticImports.add(line.substring(IMPORT_STATIC_TOKEN.length())); + else if (line.startsWith(IMPORT_TOKEN)) + imports.add(line.substring(IMPORT_TOKEN.length())); + }) + .filter(line -> !line.startsWith(IMPORT_TOKEN)) + .collect(Collectors.toList()); + } + + /** */ + private int counfOf(String line, char ch) { + int cnt = 0; + int idx = line.indexOf(ch); + + while (idx != -1) { + cnt++; + idx = line.indexOf(ch, idx + 1); + } + + return cnt; + } + + /** */ + private void addImport(Class<?> cls) { + if ("java.lang".equals(cls.getPackage().getName())) + return; + + imports.add(cls.getName() + ';'); + } + + /** */ + private void clear() { + methodsStart = Integer.MAX_VALUE; + imports.clear(); + staticImports.clear(); + } + + /** */ + private static boolean geterExists(Class<? extends IgniteDataTransferObject> cls, Field fld) { + try { + return cls.getMethod(fld.getName()).getReturnType() == fld.getType(); + } + catch (NoSuchMethodException e) { + return false; + } + } + + /** */ + private static boolean seterExists(Class<? extends IgniteDataTransferObject> cls, Field fld) { + try { + return cls.getMethod(fld.getName(), fld.getType()).getReturnType() == void.class; + } + catch (NoSuchMethodException e) { + return false; + } + } + + /** */ + private static boolean writeExternalDataMethodExists(Class<?> cls) { + while (cls != IgniteDataTransferObject.class) { + try { + Method mtd = cls.getDeclaredMethod(WRITE_MTD, ObjectOutput.class); + + return !Modifier.isAbstract(mtd.getModifiers()); + } + catch (NoSuchMethodException e) { + cls = cls.getSuperclass(); + } + } + + return false; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java index 2f6cc9f4988..cd890667e7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSnapshot.java @@ -95,4 +95,11 @@ public interface IgniteSnapshot { * future will be {@code false} if the restore process with the specified snapshot name is not running at all. */ public IgniteFuture<Boolean> cancelSnapshotRestore(String name); + + /** + * @param name Dump name. + * @param cacheGroupNames Cache group names to be dumped or {@code null} to dump all cache groups. + * @return Future which will be completed when dump ends. + */ + public IgniteFuture<Void> createDump(String name, @Nullable Collection<String> cacheGroupNames); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e61a1a8772f..403022aa8b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1091,7 +1091,7 @@ public class IgniteKernal implements IgniteEx, Externalizable { startProcessor(new GridTaskSessionProcessor(ctx)); startProcessor(new GridJobProcessor(ctx)); startProcessor(new GridTaskProcessor(ctx)); - startProcessor((GridProcessor)SCHEDULE.createOptional(ctx)); + startProcessor(SCHEDULE.createOptional(ctx)); startProcessor(createComponent(IgniteRestProcessor.class, ctx)); startProcessor(new DataStreamProcessor(ctx)); startProcessor(new GridContinuousProcessor(ctx)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java index 7f4637e0843..9e40239041c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.management.cdc.CdcCommand; import org.apache.ignite.internal.management.consistency.ConsistencyCommand; import org.apache.ignite.internal.management.defragmentation.DefragmentationCommand; import org.apache.ignite.internal.management.diagnostic.DiagnosticCommand; +import org.apache.ignite.internal.management.dump.DumpCommand; import org.apache.ignite.internal.management.encryption.EncryptionCommand; import org.apache.ignite.internal.management.kill.KillCommand; import org.apache.ignite.internal.management.meta.MetaCommand; @@ -71,7 +72,8 @@ public class IgniteCommandRegistry extends CommandRegistryImpl<NoArg, Void> { new DefragmentationCommand(), new PerformanceStatisticsCommand(), new ConsistencyCommand(), - new CdcCommand() + new CdcCommand(), + new DumpCommand() ); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCommand.java new file mode 100644 index 00000000000..8d1d0dee67d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCommand.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.dump; + +import org.apache.ignite.internal.management.api.CommandRegistryImpl; + +/** */ +public class DumpCommand extends CommandRegistryImpl { + /** */ + public DumpCommand() { + super(new DumpCreateCommand()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommand.java new file mode 100644 index 00000000000..b03961ad029 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommand.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.dump; + +import org.apache.ignite.internal.management.api.ComputeCommand; + +/** + * + */ +public class DumpCreateCommand implements ComputeCommand<DumpCreateCommandArg, Void> { + /** {@inheritDoc} */ + @Override public String description() { + return "Create a dump of cache groups. Dump is entry by entry consistent copy of cache"; + } + + /** {@inheritDoc} */ + @Override public Class<DumpCreateCommandArg> argClass() { + return DumpCreateCommandArg.class; + } + + /** {@inheritDoc} */ + @Override public Class<DumpCreateTask> taskClass() { + return DumpCreateTask.class; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommandArg.java b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommandArg.java new file mode 100644 index 00000000000..4d13e82b308 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateCommandArg.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.dump; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.management.api.Argument; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class DumpCreateCommandArg extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0; + + /** */ + @Argument(description = "Name of the dump") + private String name; + + /** */ + @Argument(description = "Groups to create dump for", optional = true) + private String[] groups; + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeString(out, name); + U.writeArray(out, groups); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + name = U.readString(in); + groups = U.readArray(in, String.class); + } + + /** */ + public String name() { + return name; + } + + /** */ + public void name(String name) { + this.name = name; + } + + /** */ + public String[] groups() { + return groups; + } + + /** */ + public void groups(String[] groups) { + this.groups = groups; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateTask.java new file mode 100644 index 00000000000..60a1cfbeb39 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/dump/DumpCreateTask.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.dump; + +import java.util.Arrays; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DumpCreateTask extends VisorOneNodeTask<DumpCreateCommandArg, Void> { + /** {@inheritDoc} */ + @Override protected VisorJob<DumpCreateCommandArg, Void> job(DumpCreateCommandArg arg) { + return new DumpCreateJob(arg); + } + + /** */ + private static class DumpCreateJob extends VisorJob<DumpCreateCommandArg, Void> { + /** */ + protected DumpCreateJob(@Nullable DumpCreateCommandArg arg) { + super(arg, false); + } + + /** {@inheritDoc} */ + @Override protected Void run(@Nullable DumpCreateCommandArg arg) throws IgniteException { + ignite.snapshot().createDump(arg.name(), arg.groups() == null ? null : Arrays.asList(arg.groups())).get(); + + return null; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java index 7ccc2f3944c..5115077c026 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java @@ -33,7 +33,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * @param <T> Type of snapshot processing result. */ -abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> { +public abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> { /** Shared context. */ protected final GridCacheSharedContext<?, ?> cctx; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 780470bf2e2..d54be4b5a35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -141,6 +141,8 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.Metas import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpCacheFutureTask; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.dump.DumpMetadata; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; @@ -294,6 +296,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Snapshot metafile extension. */ public static final String SNAPSHOT_METAFILE_EXT = ".smf"; + /** Dump metafile extension. */ + public static final String DUMP_METAFILE_EXT = ".dmf"; + /** Snapshot temporary metafile extension. */ public static final String SNAPSHOT_METAFILE_TMP_EXT = ".tmp"; @@ -358,6 +363,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Pattern for incremental snapshot directory names. */ public static final Pattern INC_SNP_NAME_PATTERN = U.fixedLengthNumberNamePattern(null); + /** Default relative working directory path for dump operation result. */ + public static final String DFLT_DUMPS_DIRECTORY = "dumps"; + + /** Lock file for dump directory. */ + public static final String DUMP_LOCK = "dump.lock"; + /** * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s. * It is important to have only one buffer per thread (instead of creating each buffer per @@ -405,6 +416,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Main snapshot directory to save created snapshots. */ private volatile File locSnpDir; + /** Main dump directory to save created dumps. */ + private volatile File locDumpDir; + /** * Working directory for loaded snapshots from the remote nodes and storing * temporary partition delta-files of locally started snapshot process. @@ -415,7 +429,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory(); /** File store manager to create page store for restore. */ - private volatile FilePageStoreManager storeMgr; + private volatile @Nullable FilePageStoreManager storeMgr; /** File store manager to create page store for restore. */ private volatile GridLocalConfigManager locCfgMgr; @@ -524,20 +538,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (ctx.clientNode()) return; - if (!CU.isPersistenceEnabled(ctx.config())) - return; - - assert cctx.pageStore() instanceof FilePageStoreManager; - storeMgr = (FilePageStoreManager)cctx.pageStore(); locCfgMgr = cctx.cache().configManager(); pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders(); locSnpDir = resolveSnapshotWorkDirectory(ctx.config()); - tmpWorkDir = U.resolveWorkDirectory(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true); + locDumpDir = resolveSnapshotWorkDirectory(ctx.config(), DFLT_DUMPS_DIRECTORY); + tmpWorkDir = U.resolveWorkDirectory(pdsSettings.persistentStoreNodePath().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR, true); U.ensureDirectory(locSnpDir, "snapshot work directory", log); + U.ensureDirectory(locDumpDir, "snapshot work directory", log); U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log); ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( @@ -816,6 +827,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @return Local snapshot directory where snapshot files are located. */ public File snapshotLocalDir(String snpName, @Nullable String snpPath) { + return snapshotLocalDir(snpName, snpPath, locSnpDir); + } + + /** */ + private File snapshotLocalDir(String snpName, @Nullable String snpPath, File locSnpDir) { assert locSnpDir != null; assert U.alphanumericUnderscore(snpName) : snpName; @@ -889,8 +905,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter // Executed inside discovery notifier thread, prior to firing discovery custom event, // so it is safe to set new snapshot task inside this method without synchronization. if (clusterSnpReq != null) { - return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. " + - "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']')); + return new GridFinishedFuture<>(new IgniteCheckedException(snpMsg("Snapshot operation has been rejected. " + + "Another snapshot operation in progress [req=" + req + ", curr=" + clusterSnpReq + ']', req.dump()))); } clusterSnpReq = req; @@ -898,7 +914,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (req.incremental()) handleIncrementalSnapshotId(req.requestId(), cctx.discovery().topologyVersion()); - if (!CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState())) + if (!CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()) && !req.dump()) return new GridFinishedFuture<>(); Set<UUID> leftNodes = new HashSet<>(req.nodes()); @@ -906,18 +922,18 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter F.node2id())); if (!leftNodes.isEmpty()) { - return new GridFinishedFuture<>(new IgniteCheckedException("Some of baseline nodes left the cluster " + - "prior to snapshot operation start: " + leftNodes)); + return new GridFinishedFuture<>(new IgniteCheckedException(snpMsg("Some of baseline nodes left the cluster " + + "prior to snapshot operation start: " + leftNodes, req.dump()))); } if (cctx.kernalContext().encryption().isMasterKeyChangeInProgress()) { - return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. Master " + - "key changing process is not finished yet.")); + return new GridFinishedFuture<>(new IgniteCheckedException(snpMsg("Snapshot operation has been rejected. Master " + + "key changing process is not finished yet.", req.dump()))); } if (cctx.kernalContext().encryption().reencryptionInProgress()) { - return new GridFinishedFuture<>(new IgniteCheckedException("Snapshot operation has been rejected. Caches " + - "re-encryption process is not finished yet.")); + return new GridFinishedFuture<>(new IgniteCheckedException(snpMsg("Snapshot operation has been rejected. Caches " + + "re-encryption process is not finished yet.", req.dump()))); } List<Integer> grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId)); @@ -952,6 +968,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return initLocalIncrementalSnapshot(req, meta); } + else if (req.dump()) + return initLocalDump(req, grpIds); else return initLocalFullSnapshot(req, grpIds, comprGrpIds, withMetaStorage); } @@ -1193,7 +1211,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter File smf = new File(snpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())); - storeSnapshotMeta(req.meta(), smf); + storeSnapshotMeta(meta, smf); log.info("Snapshot metafile has been created: " + smf.getAbsolutePath()); @@ -1205,6 +1223,64 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter }, snapshotExecutorService()); } + /** + * @param req Request + * @param grpIds0 Groups. + * @return Create dump future. + */ + private IgniteInternalFuture<SnapshotOperationResponse> initLocalDump(SnapshotOperationRequest req, List<Integer> grpIds0) { + IgniteInternalFuture<?> task0; + + List<Integer> grpIds = grpIds0.stream().filter(grpId -> cctx.cache().cacheGroup(grpId) != null).collect(Collectors.toList()); + + File dumpDir = snapshotLocalDir(req.snapshotName(), null, locDumpDir); + + if (grpIds.isEmpty()) + task0 = new GridFinishedFuture<>(Collections.emptySet()); + else { + dumpDir.mkdirs(); + + task0 = registerTask(req.snapshotName(), new DumpCacheFutureTask( + cctx, + req.snapshotName(), + req.operationalNodeId(), + req.requestId(), + req.snapshotPath(), + dumpDir, + tmpWorkDir, + ioFactory + )); + } + + return task0.chain(fut -> { + if (fut.error() != null) + throw F.wrap(fut.error()); + + Set<String> nodes = req.nodes().stream() + .map(n -> cctx.discovery().node(n).consistentId().toString()) + .collect(Collectors.toSet()); + + fut.result(); + + DumpMetadata meta = new DumpMetadata( + req.requestId(), + cctx.localNode().consistentId().toString(), + req.snapshotName(), + grpIds, + nodes + ); + + File smf = new File(dumpDir, snapshotMetaFileName(cctx.localNode().consistentId().toString())); + + storeSnapshotMeta(meta, smf); + + log.info("Dump metafile has been created: " + smf.getAbsolutePath()); + + // TODO: Do we need to invoke handlers here? + return new SnapshotOperationResponse(null); + }, snapshotExecutorService()); + } + /** * @param id Request id. * @param res Results. @@ -1380,25 +1456,34 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (req.incremental()) U.delete(incrementalSnapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.incrementIndex())); - else - deleteSnapshot(snapshotLocalDir(req.snapshotName(), req.snapshotPath()), pdsSettings.folderName()); + else { + deleteSnapshot( + snapshotLocalDir(req.snapshotName(), req.snapshotPath(), req.dump() ? locDumpDir : locSnpDir), + pdsSettings.folderName() + ); + } } else if (!F.isEmpty(req.warnings())) { // Pass the warnings further to the next stage for the case when snapshot started from not coordinator. if (!isLocalNodeCoordinator(cctx.discovery())) snpReq.warnings(req.warnings()); - snpReq.meta().warnings(Collections.unmodifiableList(req.warnings())); + snpReq.<SnapshotMetadata>meta().warnings(Collections.unmodifiableList(req.warnings())); storeWarnings(snpReq); } - removeLastMetaStorageKey(); + if (!req.dump()) { + removeLastMetaStorageKey(); - if (req.error() == null) { - Collection<Integer> grpIds = req.groups().stream().map(CU::cacheId).collect(Collectors.toList()); + if (req.error() == null) { + Collection<Integer> grpIds = req.groups().stream().map(CU::cacheId).collect(Collectors.toList()); - enableIncrementalSnapshotsCreation(grpIds); + enableIncrementalSnapshotsCreation(grpIds); + } + } + else { + removeDumpLock(req.snapshotName()); } } catch (Exception e) { @@ -1483,8 +1568,8 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (!F.isEmpty(snpReq.warnings())) { String wrnsLst = U.nl() + "\t- " + String.join(U.nl() + "\t- ", snpReq.warnings()); - SnapshotWarningException wrn = new SnapshotWarningException("Snapshot task '" + - snpReq.snapshotName() + "' completed with the warnings:" + wrnsLst); + SnapshotWarningException wrn = new SnapshotWarningException(snpMsg("Snapshot task '" + + snpReq.snapshotName() + "' completed with the warnings:" + wrnsLst, snpReq.dump())); clusterSnpFut.onDone(wrn); @@ -1498,9 +1583,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter } } else if (snpReq.error() == null) { - clusterSnpFut.onDone(new IgniteCheckedException("Snapshot creation has been finished with an error. " + + clusterSnpFut.onDone(new IgniteCheckedException(snpMsg("Snapshot creation has been finished with an error. " + "Local snapshot tasks may not finished completely or finalizing results fails " + - "[fail=" + endFail + ", err=" + err + ']')); + "[fail=" + endFail + ", err=" + err + ']', snpReq.dump()))); } else clusterSnpFut.onDone(snpReq.error()); @@ -1754,6 +1839,11 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return new IgniteFutureImpl<>(cancelSnapshot0(name)); } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> createDump(String name, @Nullable Collection<String> cacheGroupNames) { + return createSnapshot(name, null, false, false, true, cacheGroupNames); + } + /** * @param name Snapshot name. * @@ -2114,6 +2204,16 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return createSnapshot(name, null, true, false); } + /** */ + public IgniteFutureImpl<Void> createSnapshot( + String name, + @Nullable String snpPath, + boolean incremental, + boolean onlyPrimary + ) { + return createSnapshot(name, snpPath, incremental, onlyPrimary, false, null); + } + /** * Create a consistent copy of all persistence cache groups from the whole cluster. * @@ -2121,31 +2221,39 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @param snpPath Snapshot directory path. * @param incremental Incremental snapshot flag. * @param onlyPrimary If {@code true} snapshot only primary copies of partitions. + * @param dump If {@code true} cache dump must be created. + * @param cacheGroupNames Cache group names. If {@code null} then dump all groups. * @return Future which will be completed when a process ends. */ public IgniteFutureImpl<Void> createSnapshot( String name, @Nullable String snpPath, boolean incremental, - boolean onlyPrimary + boolean onlyPrimary, + boolean dump, + @Nullable Collection<String> cacheGroupNames ) { - A.notNullOrEmpty(name, "Snapshot name cannot be null or empty."); - A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_"); + A.notNullOrEmpty(name, snpMsg("Snapshot name cannot be null or empty.", dump)); + A.ensure(U.alphanumericUnderscore(name), snpMsg("Snapshot name must satisfy the following name pattern: a-zA-Z0-9_", dump)); A.ensure(!(incremental && onlyPrimary), "Only primary not supported for incremental snapshots"); + A.ensure(dump && !(incremental || onlyPrimary), "Dump not supported onlyPrimary and incremental flags"); try { cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT); if (!IgniteFeatures.allNodesSupports(cctx.discovery().aliveServerNodes(), PERSISTENCE_CACHE_SNAPSHOT)) - throw new IgniteException("Not all nodes in the cluster support a snapshot operation."); + throw new IgniteException(snpMsg("Not all nodes in the cluster support a snapshot operation.", dump)); if (!cctx.kernalContext().state().clusterState().state().active()) - throw new IgniteException("Snapshot operation has been rejected. The cluster is inactive."); + throw new IgniteException(snpMsg("Snapshot operation has been rejected. The cluster is inactive.", dump)); DiscoveryDataClusterState clusterState = cctx.kernalContext().state().clusterState(); - if (!clusterState.hasBaselineTopology()) - throw new IgniteException("Snapshot operation has been rejected. The baseline topology is not configured for cluster."); + if (!clusterState.hasBaselineTopology()) { + throw new IgniteException( + snpMsg("Snapshot operation has been rejected. The baseline topology is not configured for cluster.", dump) + ); + } if (cctx.kernalContext().clientNode()) { ClusterNode crd = U.oldest(cctx.kernalContext().discovery().aliveServerNodes(), null); @@ -2156,34 +2264,32 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return new IgniteSnapshotFutureImpl(cctx.kernalContext().closure() .callAsync( BALANCE, - new CreateSnapshotCallable(name, incremental, onlyPrimary), + new CreateSnapshotCallable(name, incremental, onlyPrimary, dump, cacheGroupNames), options(Collections.singletonList(crd)).withFailoverDisabled() )); } - if (!CU.isPersistenceEnabled(cctx.gridConfig())) { - throw new IgniteException("Create snapshot request has been rejected. " + - "Snapshots on an in-memory clusters are not allowed."); - } - ClusterSnapshotFuture snpFut0; int incIdx = -1; synchronized (snpOpMux) { if (clusterSnpFut != null && !clusterSnpFut.isDone()) { throw new IgniteException( - "Create snapshot request has been rejected. The previous snapshot operation was not completed." + snpMsg("Create snapshot request has been rejected. The previous operation was not completed.", dump) ); } - if (clusterSnpReq != null) - throw new IgniteException("Create snapshot request has been rejected. Parallel snapshot processes are not allowed."); + if (clusterSnpReq != null) { + throw new IgniteException( + snpMsg("Create snapshot request has been rejected. Parallel snapshot processes are not allowed.", dump) + ); + } boolean snpExists = localSnapshotNames(snpPath).contains(name); if (!incremental && snpExists) { - throw new IgniteException("Create snapshot request has been rejected. " + - "Snapshot with given name already exists on local node."); + throw new IgniteException(snpMsg("Create snapshot request has been rejected. " + + "Snapshot with given name already exists on local node.", dump)); } if (incremental) { @@ -2202,7 +2308,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter if (isRestoring()) { throw new IgniteException( - "Snapshot operation has been rejected. Cache group restore operation is currently in progress." + snpMsg("Snapshot operation has been rejected. Cache group restore operation is currently in progress.", dump) ); } @@ -2216,28 +2322,30 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter lastSeenSnpFut = snpFut0; } - List<String> grps = cctx.cache().persistentGroups().stream() + List<String> grps = (dump ? cctx.cache().cacheGroupDescriptors().values() : cctx.cache().persistentGroups()).stream() .filter(g -> cctx.cache().cacheType(g.cacheOrGroupName()) == CacheType.USER) .map(CacheGroupDescriptor::cacheOrGroupName) + .filter(n -> cacheGroupNames == null || cacheGroupNames.contains(n)) .collect(Collectors.toList()); - grps.add(METASTORAGE_CACHE_NAME); + if (CU.isPersistenceEnabled(cctx.gridConfig()) && !dump) + grps.add(METASTORAGE_CACHE_NAME); List<ClusterNode> srvNodes = cctx.discovery().serverNodes(AffinityTopologyVersion.NONE); snpFut0.listen(f -> { if (f.error() == null) - recordSnapshotEvent(name, SNAPSHOT_FINISHED_MSG + grps, EVT_CLUSTER_SNAPSHOT_FINISHED); + recordSnapshotEvent(name, snpMsg(SNAPSHOT_FINISHED_MSG + grps, dump), EVT_CLUSTER_SNAPSHOT_FINISHED); else { String errMsgPref = f.error() instanceof SnapshotWarningException ? SNAPSHOT_FINISHED_WRN_MSG : SNAPSHOT_FAILED_MSG; - recordSnapshotEvent(name, errMsgPref + f.error().getMessage(), EVT_CLUSTER_SNAPSHOT_FAILED); + recordSnapshotEvent(name, snpMsg(errMsgPref, dump) + f.error().getMessage(), EVT_CLUSTER_SNAPSHOT_FAILED); } }); - Set<UUID> bltNodeIds = - new HashSet<>(F.viewReadOnly(srvNodes, F.node2id(), (node) -> CU.baselineNode(node, clusterState))); + Set<UUID> nodeIds = + new HashSet<>(F.viewReadOnly(srvNodes, F.node2id(), (node) -> dump || CU.baselineNode(node, clusterState))); startSnpProc.start(snpFut0.rqId, new SnapshotOperationRequest( snpFut0.rqId, @@ -2245,16 +2353,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter name, snpPath, grps, - bltNodeIds, + nodeIds, incremental, incIdx, - onlyPrimary + onlyPrimary, + dump )); String msg = - "Cluster-wide snapshot operation started [snpName=" + name + ", grps=" + grps + + snpMsg("Cluster-wide snapshot operation started [name=" + name + ", grps=" + grps + (incremental ? "" : (", incremental=true, incrementIndex=" + incIdx)) + - ']'; + ']', dump); recordSnapshotEvent(name, msg, EVT_CLUSTER_SNAPSHOT_STARTED); @@ -2264,9 +2373,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return new IgniteFutureImpl<>(snpFut0); } catch (Exception e) { - recordSnapshotEvent(name, SNAPSHOT_FAILED_MSG + e.getMessage(), EVT_CLUSTER_SNAPSHOT_FAILED); + recordSnapshotEvent(name, snpMsg(SNAPSHOT_FAILED_MSG + e.getMessage(), dump), EVT_CLUSTER_SNAPSHOT_FAILED); - U.error(log, SNAPSHOT_FAILED_MSG, e); + U.error(log, snpMsg(SNAPSHOT_FAILED_MSG, dump), e); ClusterSnapshotFuture errSnpFut = new ClusterSnapshotFuture(name, e); @@ -2513,6 +2622,14 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return U.maskForFileName(consId) + SNAPSHOT_METAFILE_EXT; } + /** + * @param consId Consistent node id. + * @return Dump metadata file name. + */ + public static String dumpMetaFileName(String consId) { + return U.maskForFileName(consId) + DUMP_METAFILE_EXT; + } + /** * @param snpDir The full path to the snapshot files. * @param folderName The node folder name, usually it's the same as the U.maskForFileName(consistentId). @@ -2655,6 +2772,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return task; } + /** + * @param snpName Unique snapshot name. + * @param srcNodeId Node id which cause snapshot operation. + * @param requestId Snapshot operation request ID. + * @param grpIds Groups. + * @return Snapshot operation task which should be registered on checkpoint to run. + */ + private IgniteInternalFuture<?> registerDumpTask(String snpName, UUID srcNodeId, UUID requestId, List<Integer> grpIds) { + return new GridFinishedFuture<>("Done!"); + } + /** * Registers a local snapshot task. * @@ -2773,6 +2901,31 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter } } + /** */ + private void removeDumpLock(String dumpName) throws IgniteCheckedException { + File lock = dumpLockFile(snapshotLocalDir(dumpName, null, locDumpDir), cctx); + + if (!lock.exists()) + throw new IgniteCheckedException("Lock file not exists: " + lock); + + if (!lock.delete()) + throw new IgniteCheckedException("Lock file can't be deleted: " + lock); + } + + /** */ + public static File dumpLockFile(File dumpDir, GridCacheSharedContext<?, ?> cctx) throws IgniteCheckedException { + File nodeDumpDir = new File(dumpDir, databaseRelativePath(cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName())); + + if (nodeDumpDir.exists()) { + if (!nodeDumpDir.isDirectory()) + throw new IgniteCheckedException(nodeDumpDir + " must be a directory"); + } + else if (!nodeDumpDir.mkdirs()) + throw new IgniteCheckedException("Dump directory can't be created: " + nodeDumpDir); + + return new File(nodeDumpDir, DUMP_LOCK); + } + /** * Disables creation of incremental snapshots for the given cache group. * @@ -2879,7 +3032,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @return Relative configured path of persistence data storage directory for the local node. * Example: {@code snapshotWorkDir/db/IgniteNodeName0} */ - static String databaseRelativePath(String folderName) { + public static String databaseRelativePath(String folderName) { return Paths.get(DB_DEFAULT_FOLDER, folderName).toString(); } @@ -2888,9 +3041,17 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @return Snapshot directory resolved through given configuration. */ public static File resolveSnapshotWorkDirectory(IgniteConfiguration cfg) { + return resolveSnapshotWorkDirectory(cfg, cfg.getSnapshotPath()); + } + + /** */ + public static File resolveSnapshotWorkDirectory(IgniteConfiguration cfg, String baseDir) { try { - return U.resolveWorkDirectory(cfg.getWorkDirectory() == null ? U.defaultWorkDirectory() : cfg.getWorkDirectory(), - cfg.getSnapshotPath(), false); + return U.resolveWorkDirectory( + cfg.getWorkDirectory() == null ? U.defaultWorkDirectory() : cfg.getWorkDirectory(), + baseDir, + false + ); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -4527,6 +4688,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** If {@code true} snapshot only primary copies of partitions. */ private final boolean onlyPrimary; + /** If {@code true} create cache dump. */ + private final boolean dump; + + /** Cache group names. */ + private final Collection<String> cacheGroupNames; + /** Auto-injected grid instance. */ @IgniteInstanceResource private transient IgniteEx ignite; @@ -4534,18 +4701,34 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** * @param snpName Snapshot name. */ - public CreateSnapshotCallable(String snpName, boolean incremental, boolean onlyPrimary) { + public CreateSnapshotCallable( + String snpName, + boolean incremental, + boolean onlyPrimary, + boolean dump, + Collection<String> cacheGroupNames + ) { this.snpName = snpName; this.incremental = incremental; this.onlyPrimary = onlyPrimary; + this.dump = dump; + this.cacheGroupNames = cacheGroupNames; } /** {@inheritDoc} */ @Override public Void call() throws Exception { if (incremental) ignite.snapshot().createIncrementalSnapshot(snpName).get(); - else - ignite.context().cache().context().snapshotMgr().createSnapshot(snpName, null, false, onlyPrimary).get(); + else { + ignite.context().cache().context().snapshotMgr().createSnapshot( + snpName, + null, + false, + onlyPrimary, + dump, + cacheGroupNames + ).get(); + } return null; } @@ -4617,4 +4800,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** @return An instance of {@link R}. */ R create(E1 e1, E2 e2) throws IOException; } + + /** */ + private static String snpMsg(String msg, boolean dump) { + if (!dump) + return msg; + + return msg.replaceAll("Snapshot", "Dump").replaceAll("snapshot", "dump"); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java index 05008820bf4..90777b1cc56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java @@ -91,6 +91,9 @@ public class SnapshotOperationRequest implements Serializable { /** If {@code true} snapshot only primary copies of partitions. */ private final boolean onlyPrimary; + /** If {@code true} then create dump. */ + private final boolean dump; + /** * @param reqId Request ID. * @param opNodeId Operational node ID. @@ -101,6 +104,7 @@ public class SnapshotOperationRequest implements Serializable { * @param incremental {@code True} if incremental snapshot requested. * @param incIdx Incremental snapshot index. * @param onlyPrimary If {@code true} snapshot only primary copies of partitions. + * @param dump If {@code true} then create dump. */ public SnapshotOperationRequest( UUID reqId, @@ -111,7 +115,8 @@ public class SnapshotOperationRequest implements Serializable { Set<UUID> nodes, boolean incremental, int incIdx, - boolean onlyPrimary + boolean onlyPrimary, + boolean dump ) { this.reqId = reqId; this.opNodeId = opNodeId; @@ -122,6 +127,7 @@ public class SnapshotOperationRequest implements Serializable { this.incremental = incremental; this.incIdx = incIdx; this.onlyPrimary = onlyPrimary; + this.dump = dump; startTime = U.currentTimeMillis(); } @@ -196,6 +202,11 @@ public class SnapshotOperationRequest implements Serializable { return onlyPrimary; } + /** @return If {@code true} then create dump. */ + public boolean dump() { + return dump; + } + /** @return Start time. */ public long startTime() { return startTime; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index f16543a4fa4..6150cf9684e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -420,7 +420,8 @@ public class SnapshotRestoreProcess { new HashSet<>(bltNodes), false, incIdx, - onlyPrimary + onlyPrimary, + false ); prepareRestoreProc.start(req.requestId(), req); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java index c48f899fc7d..010ade1952f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotSender.java @@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable; /** * */ -abstract class SnapshotSender { +public abstract class SnapshotSender { /** Busy processing lock. */ private final ReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java new file mode 100644 index 00000000000..d2090a1ef17 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpCacheFutureTask.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.function.BiConsumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotFutureTask; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotSender; +import org.jetbrains.annotations.Nullable; + +/** */ +public class DumpCacheFutureTask extends AbstractSnapshotFutureTask<Void> implements BiConsumer<String, File> { + /** */ + private final File dumpDir; + + /** + * @param cctx Cache context. + * @param dumpName Dump name. + * @param srcNodeId Node id which cause snapshot task creation. + * @param reqId Snapshot operation request ID. + * @param tmpWorkDir Working directory for intermediate snapshot results. + * @param ioFactory Factory to working with snapshot files. + */ + public DumpCacheFutureTask( + GridCacheSharedContext<?, ?> cctx, + String dumpName, + UUID srcNodeId, + UUID reqId, + @Nullable String snpPath, + File dumpDir, + File tmpWorkDir, + FileIOFactory ioFactory + ) { + super( + cctx, + srcNodeId, + reqId, + dumpName, + tmpWorkDir, + ioFactory, + new SnapshotSender( + cctx.logger(DumpCacheFutureTask.class), + cctx.kernalContext().pools().getSnapshotExecutorService() + ) { + @Override protected void init(int partsCnt) { + // No-op. + } + + @Override protected void sendPart0(File part, String cacheDirName, GroupPartitionId pair, Long length) { + // No-op. + } + + @Override protected void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) { + // No-op. + } + }, + null + ); + + this.dumpDir = dumpDir; + + cctx.cache().configManager().addConfigurationChangeListener(this); + } + + /** {@inheritDoc} */ + @Override public boolean start() { + try { + log.info("start!"); + + lockDumpDirectory(); + + onDone(); + } + catch (IgniteCheckedException | IOException e) { + onDone(e); + } + + return false; // Don't wait for checkpoint. + } + + /** */ + private void lockDumpDirectory() throws IgniteCheckedException, IOException { + File lock = IgniteSnapshotManager.dumpLockFile(dumpDir, cctx); + + if (!lock.createNewFile()) + throw new IgniteCheckedException("Lock file can't be created or already exists: " + lock.getAbsolutePath()); + } + + /** {@inheritDoc} */ + @Override public void acceptException(Throwable th) { + + } + + /** {@inheritDoc} */ + @Override public void accept(String s, File file) { + + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpMetadata.java new file mode 100644 index 00000000000..42b969ce6cc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/DumpMetadata.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot.dump; + +import java.io.Serializable; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; + +/** + * + */ +public class DumpMetadata implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Unique dump request id. */ + private final UUID rqId; + + /** Snapshot name. */ + @GridToStringInclude + private final String dumpName; + + /** Consistent id of a node to which this metadata relates. */ + @GridToStringInclude + private final String consId; + + /** The list of cache groups ids which were included into dump. */ + @GridToStringInclude + private final List<Integer> grpIds; + + /** The set of affected by dump nodes. */ + @GridToStringInclude + private final Set<String> nodes; + + /** + * @param rqId Unique request id. + * @param dumpName Dump name. + * @param consId Consistent id of a node to which this metadata relates. + * @param grpIds The list of cache groups ids which were included into dump. + * @param nodes The set of affected by dump nodes. + */ + public DumpMetadata(UUID rqId, String dumpName, String consId, List<Integer> grpIds, Set<String> nodes) { + this.rqId = rqId; + this.dumpName = dumpName; + this.consId = consId; + this.grpIds = grpIds; + this.nodes = nodes; + } + + /** */ + public UUID requestId() { + return rqId; + } + + /** */ + public String dumpName() { + return dumpName; + } + + /** */ + public String consistentId() { + return consId; + } + + /** */ + public List<Integer> groups() { + return grpIds; + } + + /** */ + public Set<String> nodes() { + return nodes; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java index c7985236c1a..d0c3ddd28c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java @@ -516,19 +516,19 @@ public class PoolProcessor extends GridProcessorAdapter { rebalanceExecSvc.allowCoreThreadTimeOut(true); - if (CU.isPersistenceEnabled(ctx.config())) { - snpExecSvc = createExecutorService( - SNAPSHOT_RUNNER_THREAD_PREFIX, - cfg.getIgniteInstanceName(), - cfg.getSnapshotThreadPoolSize(), - cfg.getSnapshotThreadPoolSize(), - DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<>(), - GridIoPolicy.UNDEFINED, - excHnd); + snpExecSvc = createExecutorService( + SNAPSHOT_RUNNER_THREAD_PREFIX, + cfg.getIgniteInstanceName(), + cfg.getSnapshotThreadPoolSize(), + cfg.getSnapshotThreadPoolSize(), + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<>(), + GridIoPolicy.UNDEFINED, + excHnd); - snpExecSvc.allowCoreThreadTimeOut(true); + snpExecSvc.allowCoreThreadTimeOut(true); + if (CU.isPersistenceEnabled(ctx.config())) { reencryptExecSvc = createExecutorService( "reencrypt", ctx.igniteInstanceName(), diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheDumpSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheDumpSelfTest.java new file mode 100644 index 00000000000..93698443140 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheDumpSelfTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.IntStream; +import javax.management.DynamicMBean; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.internal.management.api.CommandMBean.INVOKE; + +/** */ +public class IgniteCacheDumpSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + cleanPersistenceDir(); + } + + /** */ + @Test + public void testCacheDump() throws Exception { + try (IgniteEx ign = startGrid(0)) { + IgniteCache<Object, Object> cache = ign.createCache(DEFAULT_CACHE_NAME); + + IntStream.range(0, 10).forEach(i -> cache.put(i, i)); + + Object[] args = {"dump", ""}; + + String[] signature = new String[args.length]; + + Arrays.fill(signature, String.class.getName()); + + String res = (String)createDumpBean(ign).invoke(INVOKE, args, signature); + + assertTrue(res.isEmpty()); + } + } + + /** */ + private static DynamicMBean createDumpBean(IgniteEx ign) { + DynamicMBean mbean = getMxBean( + ign.context().igniteInstanceName(), + "management", + Collections.singletonList("Dump"), + "Create", + DynamicMBean.class + ); + + assertNotNull(mbean); + + return mbean; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 2f72c87c8dc..62d96b96615 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -157,6 +157,7 @@ import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DFLT_DUMPS_DIRECTORY; import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; @@ -1972,8 +1973,11 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, false)); - if (!saveSnp) + if (!saveSnp) { U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false)); + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_DUMPS_DIRECTORY, false)); + } + } /**
