This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d8885b22d0 IGNITE-18674 Introduce async logic to CLI (#1731)
d8885b22d0 is described below
commit d8885b22d05231af53996ace5bbf9fb0564bedc9
Author: Aleksandr Pakhomov <[email protected]>
AuthorDate: Thu Mar 2 18:52:17 2023 +0400
IGNITE-18674 Introduce async logic to CLI (#1731)
Co-authored-by: Vadim Pakhnushev <[email protected]>
---
gradle/libs.versions.toml | 3 +
modules/cli/build.gradle | 1 +
.../internal/cli/commands/sql/SqlReplCommand.java | 5 +-
.../core/call/AbstractCallExecutionPipeline.java | 108 +++++++++++++
.../cli/core/call/{Call.java => AsyncCall.java} | 14 +-
.../cli/core/call/AsyncCallExecutionPipeline.java | 82 ++++++++++
.../call/AsyncCallExecutionPipelineBuilder.java | 128 +++++++++++++++
.../apache/ignite/internal/cli/core/call/Call.java | 1 +
.../cli/core/call/CallExecutionPipeline.java | 177 ++-------------------
.../core/call/CallExecutionPipelineBuilder.java | 107 +++++++++++++
.../call/{Call.java => ProgressBarTracker.java} | 24 +--
.../core/call/{Call.java => ProgressTracker.java} | 13 +-
.../cli/core/call/SingleCallExecutionPipeline.java | 58 +++++++
.../ignite/internal/cli/sql/SqlSchemaProvider.java | 25 ++-
.../internal/cli/core/call/PipelineTest.java | 55 ++++++-
.../internal/cli/sql/SqlSchemaProviderTest.java | 14 +-
16 files changed, 613 insertions(+), 202 deletions(-)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index ddc93a2355..96a73f7daf 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -72,6 +72,7 @@ testkit = "1.8.2"
openapi = "3.2.0"
autoService = "1.0.1"
awaitility = "4.2.0"
+progressBar = "0.9.4"
#Tools
pmdTool = "6.28.0"
@@ -229,3 +230,5 @@ auto-service = { module =
"com.google.auto.service:auto-service", version.ref =
auto-service-annotations = { module =
"com.google.auto.service:auto-service-annotations", version.ref = "autoService"
}
awaitility = { module = "org.awaitility:awaitility", version.ref =
"awaitility" }
+
+progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar"
}
diff --git a/modules/cli/build.gradle b/modules/cli/build.gradle
index ea83e10ab1..aa408e17f9 100644
--- a/modules/cli/build.gradle
+++ b/modules/cli/build.gradle
@@ -60,6 +60,7 @@ dependencies {
implementation libs.okhttp.logging
implementation libs.threetenbp
implementation libs.swagger.legacy.annotations
+ implementation libs.progressBar
testAnnotationProcessor libs.picocli.annotation.processor
testAnnotationProcessor libs.micronaut.inject.annotation.processor
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
index 71cf1948b4..911760001a 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlReplCommand.java
@@ -100,7 +100,10 @@ public class SqlReplCommand extends BaseCommand implements
Runnable {
try (SqlManager sqlManager = new SqlManager(jdbc)) {
// When passing white space to this command, picocli will treat it
as a positional argument
if (execOptions == null || (execOptions.command != null &&
execOptions.command.isBlank())) {
- SqlCompleter sqlCompleter = new SqlCompleter(new
SqlSchemaProvider(sqlManager::getMetadata));
+ SqlSchemaProvider schemaProvider = new
SqlSchemaProvider(sqlManager::getMetadata);
+ schemaProvider.initStateAsync();
+
+ SqlCompleter sqlCompleter = new SqlCompleter(schemaProvider);
IgniteSqlCommandCompleter sqlCommandCompleter = new
IgniteSqlCommandCompleter();
replExecutorProvider.get().execute(Repl.builder()
.withPromptProvider(() ->
ansi(fg(Color.GREEN).mark("sql-cli> ")))
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
new file mode 100644
index 0000000000..0022c4dcf9
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AbstractCallExecutionPipeline.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import org.apache.ignite.internal.cli.core.exception.ExceptionWriter;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+
+/**
+ * Abstract implementation of {@link CallExecutionPipeline}.
+ * Implements common error handling logic and verbose output redirection.
+ */
+public abstract class AbstractCallExecutionPipeline<I extends CallInput, T>
implements CallExecutionPipeline<I, T> {
+
+ /** Writer for execution output. */
+ protected final PrintWriter output;
+
+ /** Writer for error execution output. */
+ protected final PrintWriter errOutput;
+
+ /** Decorator that decorates call's output. */
+ protected final Decorator<T, TerminalOutput> decorator;
+
+ /** Handlers for any exceptions. */
+ protected final ExceptionHandlers exceptionHandlers;
+
+ /** Provider for call's input. */
+ protected final Supplier<I> inputProvider;
+
+ /** If {@code true}, debug output will be printed to console. */
+ protected final boolean verbose;
+
+ AbstractCallExecutionPipeline(
+ PrintWriter output,
+ PrintWriter errOutput,
+ ExceptionHandlers exceptionHandlers,
+ Decorator<T, TerminalOutput> decorator,
+ Supplier<I> inputProvider,
+ boolean verbose
+ ) {
+ this.output = output;
+ this.exceptionHandlers = exceptionHandlers;
+ this.errOutput = errOutput;
+ this.decorator = decorator;
+ this.inputProvider = inputProvider;
+ this.verbose = verbose;
+ }
+
+ /**
+ * Runs the pipeline.
+ *
+ * @return exit code.
+ */
+ @Override
+ public int runPipeline() {
+ try {
+ if (verbose) {
+ CliLoggers.startOutputRedirect(errOutput);
+ }
+ return runPipelineInternal();
+ } finally {
+ if (verbose) {
+ CliLoggers.stopOutputRedirect();
+ }
+ }
+ }
+
+ int handleResult(CallOutput<T> callOutput) {
+ if (callOutput.hasError()) {
+ return handleException(callOutput.errorCause());
+ }
+
+ if (!callOutput.isEmpty()) {
+ TerminalOutput decoratedOutput =
decorator.decorate(callOutput.body());
+ output.println(decoratedOutput.toTerminalString());
+ }
+
+ return 0;
+ }
+
+ /**
+ * Runs the pipeline and blocks the thread until the result is ready.
+ */
+ protected abstract int runPipelineInternal();
+
+ int handleException(Throwable error) {
+ return
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput),
error);
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
similarity index 72%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
index 1870d732c0..26c060e644 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCall.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.cli.core.call;
+import java.util.concurrent.CompletableFuture;
+
/**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
+ * Call that represents an asynchronous action that can be performed given an
input.
+ * It can be REST call, dictionary lookup or whatever.
*/
-public interface Call<IT extends CallInput, OT> {
- CallOutput<OT> execute(IT input);
+@FunctionalInterface
+public interface AsyncCall<IT extends CallInput, OT> {
+ CompletableFuture<CallOutput<OT>> execute(IT input);
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
new file mode 100644
index 0000000000..9eb9db7740
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipeline.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import me.tongfei.progressbar.DelegatingProgressBarConsumer;
+import me.tongfei.progressbar.ProgressBar;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+
+/** Call execution pipeline that executes an async call and displays progress
bar. */
+public class AsyncCallExecutionPipeline<I extends CallInput, T> extends
AbstractCallExecutionPipeline<I, T> {
+ /** Async call factory. */
+ private final Function<ProgressTracker, AsyncCall<I, T>> callFactory;
+
+ /** Builder for progress bar rendering. */
+ private final ProgressBarBuilder progressBarBuilder;
+
+ AsyncCallExecutionPipeline(
+ Function<ProgressTracker, AsyncCall<I, T>> callFactory,
+ ProgressBarBuilder progressBarBuilder,
+ PrintWriter output,
+ PrintWriter errOutput,
+ ExceptionHandlers exceptionHandlers,
+ Decorator<T, TerminalOutput> decorator,
+ Supplier<I> inputProvider,
+ boolean verbose
+ ) {
+ super(output, errOutput, exceptionHandlers, decorator, inputProvider,
verbose);
+ this.callFactory = callFactory;
+ this.progressBarBuilder = progressBarBuilder;
+ }
+
+ @Override
+ public int runPipelineInternal() {
+ I callInput = inputProvider.get();
+
+ progressBarBuilder.setConsumer(new
DelegatingProgressBarConsumer(this::print));
+ ProgressBar progressBar = progressBarBuilder.build();
+
+ try {
+ CallOutput<T> result = callFactory.apply(new
ProgressBarTracker(progressBar))
+ .execute(callInput)
+ .whenComplete((el, err) -> progressBar.close())
+ .join();
+
+ // move carriage to the next line
+ output.println();
+
+ return handleResult(result);
+ } catch (CompletionException e) {
+ return handleException(e.getCause());
+ } catch (Exception e) {
+ return handleException(e);
+ }
+ }
+
+ private void print(String s) {
+ output.print("\r" + s); // carriage return to the beginning of the
line to overwrite the previous progress bar
+ output.flush();
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
new file mode 100644
index 0000000000..9279bff18b
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.time.temporal.ChronoUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import me.tongfei.progressbar.ProgressBarBuilder;
+import me.tongfei.progressbar.ProgressBarStyle;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
+import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
+
+/** Builder for {@link AsyncCallExecutionPipeline}. */
+public class AsyncCallExecutionPipelineBuilder<I extends CallInput, T> {
+
+ private final Function<ProgressTracker, AsyncCall<I, T>> callFactory;
+
+ private final ProgressBarBuilder progressBarBuilder = new
ProgressBarBuilder()
+ .setStyle(ProgressBarStyle.UNICODE_BLOCK)
+ .continuousUpdate()
+ .setSpeedUnit(ChronoUnit.SECONDS)
+ .setInitialMax(100)
+ .hideETA()
+ .setTaskName("")
+ .showSpeed();
+
+ private final ExceptionHandlers exceptionHandlers = new
DefaultExceptionHandlers();
+
+ private Supplier<I> inputProvider;
+
+ private PrintWriter output = wrapOutputStream(System.out);
+
+ private PrintWriter errOutput = wrapOutputStream(System.err);
+
+ private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>();
+
+ private boolean verbose;
+
+ AsyncCallExecutionPipelineBuilder(Function<ProgressTracker, AsyncCall<I,
T>> callFactory) {
+ this.callFactory = callFactory;
+ }
+
+ private static PrintWriter wrapOutputStream(OutputStream output) {
+ return new PrintWriter(output, true, getStdoutEncoding());
+ }
+
+ private static Charset getStdoutEncoding() {
+ String encoding = System.getProperty("sun.stdout.encoding");
+ return encoding != null ? Charset.forName(encoding) :
Charset.defaultCharset();
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I>
inputProvider) {
+ this.inputProvider = inputProvider;
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
+ this.output = output;
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> output(OutputStream output)
{
+ return output(wrapOutputStream(output));
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> errOutput(PrintWriter
errOutput) {
+ this.errOutput = errOutput;
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> errOutput(OutputStream
output) {
+ return errOutput(wrapOutputStream(output));
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T>
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
+ exceptionHandlers.addExceptionHandler(exceptionHandler);
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T>
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
+ this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> decorator(Decorator<T,
TerminalOutput> decorator) {
+ this.decorator = decorator;
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
+ this.verbose = verbose;
+ return this;
+ }
+
+ public AsyncCallExecutionPipelineBuilder<I, T> name(String name) {
+ this.progressBarBuilder.setTaskName(name);
+ return this;
+ }
+
+ /** Builds {@link AsyncCallExecutionPipeline}. */
+ public CallExecutionPipeline<I, T> build() {
+ return new AsyncCallExecutionPipeline<>(
+ callFactory, progressBarBuilder, output, errOutput,
exceptionHandlers, decorator, inputProvider, verbose
+ );
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
index 1870d732c0..4ae4568125 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
@@ -24,6 +24,7 @@ package org.apache.ignite.internal.cli.core.call;
* @param <IT> Input for the call.
* @param <OT> Output of the call.
*/
+@FunctionalInterface
public interface Call<IT extends CallInput, OT> {
CallOutput<OT> execute(IT input);
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
index 5d265b0d87..a8fe9db3fb 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
@@ -17,182 +17,31 @@
package org.apache.ignite.internal.cli.core.call;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.cli.core.decorator.Decorator;
-import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
-import org.apache.ignite.internal.cli.core.exception.ExceptionWriter;
-import
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
-import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
-import org.apache.ignite.internal.cli.logger.CliLoggers;
-
-/**
- * Call execution pipeline.
- *
- * @param <I> Call input type.
- * @param <T> Call output's body type.
- */
-public class CallExecutionPipeline<I extends CallInput, T> {
- /** Call to execute. */
- private final Call<I, T> call;
-
- /** Writer for execution output. */
- private final PrintWriter output;
-
- /** Writer for error execution output. */
- private final PrintWriter errOutput;
-
- /** Decorator that decorates call's output. */
- private final Decorator<T, TerminalOutput> decorator;
-
- /** Handlers for any exceptions. */
- private final ExceptionHandlers exceptionHandlers;
-
- /** Provider for call's input. */
- private final Supplier<I> inputProvider;
-
- /** If {@code true}, debug output will be printed to console. */
- private final boolean verbose;
-
- private CallExecutionPipeline(Call<I, T> call,
- PrintWriter output,
- PrintWriter errOutput,
- ExceptionHandlers exceptionHandlers,
- Decorator<T, TerminalOutput> decorator,
- Supplier<I> inputProvider,
- boolean verbose
- ) {
- this.call = call;
- this.output = output;
- this.exceptionHandlers = exceptionHandlers;
- this.errOutput = errOutput;
- this.decorator = decorator;
- this.inputProvider = inputProvider;
- this.verbose = verbose;
- }
+import java.util.function.Function;
+/** Pipeline that executes a call. */
+@FunctionalInterface
+public interface CallExecutionPipeline<I extends CallInput, T> {
/**
* Builder helper method.
*
* @return builder for {@link CallExecutionPipeline}.
*/
- public static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T>
builder(Call<I, T> call) {
+ static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T>
builder(Call<I, T> call) {
return new CallExecutionPipelineBuilder<>(call);
}
+ /** Builder helper method. */
+ static <I extends CallInput, T> AsyncCallExecutionPipelineBuilder<I, T>
asyncBuilder(
+ Function<ProgressTracker, AsyncCall<I, T>> callFactory
+ ) {
+ return new AsyncCallExecutionPipelineBuilder<>(callFactory);
+ }
+
/**
* Runs the pipeline.
*
* @return exit code.
*/
- public int runPipeline() {
- try {
- if (verbose) {
- CliLoggers.startOutputRedirect(errOutput);
- }
- return runPipelineInternal();
- } finally {
- if (verbose) {
- CliLoggers.stopOutputRedirect();
- }
- }
- }
-
- private int runPipelineInternal() {
- I callInput = inputProvider.get();
-
- CallOutput<T> callOutput = call.execute(callInput);
-
- if (callOutput.hasError()) {
- return
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput),
callOutput.errorCause());
- }
-
- if (!callOutput.isEmpty()) {
- TerminalOutput decoratedOutput =
decorator.decorate(callOutput.body());
- output.println(decoratedOutput.toTerminalString());
- }
- return 0;
- }
-
- /** Builder for {@link CallExecutionPipeline}. */
- public static class CallExecutionPipelineBuilder<I extends CallInput, T> {
-
- private final Call<I, T> call;
-
- private final ExceptionHandlers exceptionHandlers = new
DefaultExceptionHandlers();
-
- private Supplier<I> inputProvider;
-
- private PrintWriter output = wrapOutputStream(System.out);
-
- private PrintWriter errOutput = wrapOutputStream(System.err);
-
- private Decorator<T, TerminalOutput> decorator = new
DefaultDecorator<>();
-
- private boolean verbose;
-
- public CallExecutionPipelineBuilder(Call<I, T> call) {
- this.call = call;
- }
-
- public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I>
inputProvider) {
- this.inputProvider = inputProvider;
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
- this.output = output;
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
- return output(wrapOutputStream(output));
- }
-
- public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter
errOutput) {
- this.errOutput = errOutput;
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream
output) {
- return errOutput(wrapOutputStream(output));
- }
-
- public CallExecutionPipelineBuilder<I, T>
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
- exceptionHandlers.addExceptionHandler(exceptionHandler);
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T>
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
- this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T,
TerminalOutput> decorator) {
- this.decorator = decorator;
- return this;
- }
-
- public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
- this.verbose = verbose;
- return this;
- }
-
- public CallExecutionPipeline<I, T> build() {
- return new CallExecutionPipeline<>(call, output, errOutput,
exceptionHandlers, decorator, inputProvider, verbose);
- }
-
- private static PrintWriter wrapOutputStream(OutputStream output) {
- return new PrintWriter(output, true, getStdoutEncoding());
- }
-
- private static Charset getStdoutEncoding() {
- String encoding = System.getProperty("sun.stdout.encoding");
- return encoding != null ? Charset.forName(encoding) :
Charset.defaultCharset();
- }
- }
+ int runPipeline();
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
new file mode 100644
index 0000000000..321ac7e391
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+import
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
+import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
+
+/** Builder for {@link CallExecutionPipeline}. */
+public class CallExecutionPipelineBuilder<I extends CallInput, T> {
+
+ private final Call<I, T> call;
+
+ private final ExceptionHandlers exceptionHandlers = new
DefaultExceptionHandlers();
+
+ private Supplier<I> inputProvider;
+
+ private PrintWriter output = wrapOutputStream(System.out);
+
+ private PrintWriter errOutput = wrapOutputStream(System.err);
+
+ private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>();
+
+ private boolean verbose;
+
+ CallExecutionPipelineBuilder(Call<I, T> call) {
+ this.call = call;
+ }
+
+ private static PrintWriter wrapOutputStream(OutputStream output) {
+ return new PrintWriter(output, true, getStdoutEncoding());
+ }
+
+ private static Charset getStdoutEncoding() {
+ String encoding = System.getProperty("sun.stdout.encoding");
+ return encoding != null ? Charset.forName(encoding) :
Charset.defaultCharset();
+ }
+
+ public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I>
inputProvider) {
+ this.inputProvider = inputProvider;
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
+ this.output = output;
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
+ return output(wrapOutputStream(output));
+ }
+
+ public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput)
{
+ this.errOutput = errOutput;
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) {
+ return errOutput(wrapOutputStream(output));
+ }
+
+ public CallExecutionPipelineBuilder<I, T>
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
+ exceptionHandlers.addExceptionHandler(exceptionHandler);
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T>
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
+ this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T,
TerminalOutput> decorator) {
+ this.decorator = decorator;
+ return this;
+ }
+
+ public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
+ this.verbose = verbose;
+ return this;
+ }
+
+ public CallExecutionPipeline<I, T> build() {
+ return new SingleCallExecutionPipeline<>(call, output, errOutput,
exceptionHandlers, decorator, inputProvider, verbose);
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
similarity index 68%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
index 1870d732c0..a1beaa70eb 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressBarTracker.java
@@ -17,13 +17,19 @@
package org.apache.ignite.internal.cli.core.call;
-/**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
- */
-public interface Call<IT extends CallInput, OT> {
- CallOutput<OT> execute(IT input);
+import me.tongfei.progressbar.ProgressBar;
+
+/** {@link ProgressBar} based tracker. */
+public class ProgressBarTracker implements ProgressTracker {
+ private final ProgressBar progressBar;
+
+ public ProgressBarTracker(ProgressBar progressBar) {
+ this.progressBar = progressBar;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void track() {
+ progressBar.step();
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
index 1870d732c0..9e127230cb 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/Call.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/ProgressTracker.java
@@ -17,13 +17,8 @@
package org.apache.ignite.internal.cli.core.call;
-/**
- * Call that represents an action that can be performed given an input.
- * It can be rest call, dictionary lookup or whatever.
- *
- * @param <IT> Input for the call.
- * @param <OT> Output of the call.
- */
-public interface Call<IT extends CallInput, OT> {
- CallOutput<OT> execute(IT input);
+/** Progress tracker that will be called periodically during the call
execution. */
+public interface ProgressTracker {
+ /** Tracks that the step is performed. */
+ void track(); // todo: add the increment parameter in
https://issues.apache.org/jira/browse/IGNITE-18731
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
new file mode 100644
index 0000000000..23e2b5e307
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipeline.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.call;
+
+import java.io.PrintWriter;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cli.core.decorator.Decorator;
+import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
+import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
+
+/**
+ * Call execution pipeline that executes a single call.
+ *
+ * @param <I> Call input type.
+ * @param <T> Call output's body type.
+ */
+public class SingleCallExecutionPipeline<I extends CallInput, T> extends
AbstractCallExecutionPipeline<I, T> {
+ /** Call to execute. */
+ private final Call<I, T> call;
+
+ SingleCallExecutionPipeline(
+ Call<I, T> call,
+ PrintWriter output,
+ PrintWriter errOutput,
+ ExceptionHandlers exceptionHandlers,
+ Decorator<T, TerminalOutput> decorator,
+ Supplier<I> inputProvider,
+ boolean verbose
+ ) {
+ super(output, errOutput, exceptionHandlers, decorator, inputProvider,
verbose);
+ this.call = call;
+ }
+
+
+ @Override
+ public int runPipelineInternal() {
+ I callInput = inputProvider.get();
+
+ CallOutput<T> callOutput = call.execute(callInput);
+
+ return handleResult(callOutput);
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
index 85906f7f7a..5924efeb85 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/sql/SqlSchemaProvider.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.cli.sql;
import java.time.Duration;
import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
/**
* SQL schema provider.
@@ -30,9 +32,9 @@ public class SqlSchemaProvider implements SchemaProvider {
private final int schemaUpdateTimeout;
- private SqlSchema schema;
+ private final AtomicReference<SqlSchema> schema = new
AtomicReference<>(null);
- private Instant lastUpdate;
+ private final AtomicReference<Instant> lastUpdate = new
AtomicReference<>(Instant.now());
public SqlSchemaProvider(MetadataSupplier metadataSupplier) {
this(metadataSupplier, SCHEMA_UPDATE_TIMEOUT);
@@ -45,11 +47,22 @@ public class SqlSchemaProvider implements SchemaProvider {
@Override
public SqlSchema getSchema() {
- if (schema == null || Duration.between(lastUpdate,
Instant.now()).toSeconds() >= schemaUpdateTimeout) {
- schema = sqlSchemaLoader.loadSchema();
- lastUpdate = Instant.now();
+ if (schema.compareAndSet(null, sqlSchemaLoader.loadSchema())) {
+ lastUpdate.set(Instant.now());
+ return schema.get();
+ } else if (Duration.between(lastUpdate.get(),
Instant.now()).toSeconds() >= schemaUpdateTimeout) {
+ CompletableFuture.supplyAsync(() -> {
+ schema.set(sqlSchemaLoader.loadSchema());
+ lastUpdate.set(Instant.now());
+ return schema.get();
+ });
}
- return schema;
+
+ return schema.get();
}
+ public void initStateAsync() {
+ // trigger schema loading in background
+ CompletableFuture.supplyAsync(this::getSchema);
+ }
}
diff --git
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
index a1aac24059..d34b2b1cb9 100644
---
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
+++
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/call/PipelineTest.java
@@ -24,17 +24,24 @@ import static org.hamcrest.Matchers.startsWith;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cli.core.exception.TestExceptionHandler;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class PipelineTest {
+ private StringWriter out = new StringWriter();
+ private StringWriter errOut = new StringWriter();
+
+ @BeforeEach
+ void setUp() {
+ out = new StringWriter();
+ errOut = new StringWriter();
+ }
+
@Test
void verboseTest() {
- // Given
- StringWriter out = new StringWriter();
- StringWriter errOut = new StringWriter();
-
// When start pipeline with verbose
CallExecutionPipeline.builder(new ThrowingStrCall())
.inputProvider(StringCallInput::new)
@@ -49,4 +56,44 @@ class PipelineTest {
assertThat(errOut.toString(), startsWith("Ooops!" +
System.lineSeparator()));
assertThat(errOut.toString(), containsString("verbose output"));
}
+
+ @Test
+ void asyncCallFailedFuture() {
+ // Given async call that retuns failed future
+ AsyncCall<StringCallInput, ?> asyncCall = callInput ->
CompletableFuture.failedFuture(new RuntimeException("Ooops!"));
+
+ // When start async pipeline with verbose
+ CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall)
+ .inputProvider(StringCallInput::new)
+ .exceptionHandler(new TestExceptionHandler())
+ .output(new PrintWriter(out))
+ .errOutput(new PrintWriter(errOut))
+ .verbose(true)
+ .build().runPipeline();
+
+ // Then error output contains the message from exception and contains
verbose output
+ assertThat(errOut.toString(), containsString("Ooops!" +
System.lineSeparator()));
+ assertThat(errOut.toString(), containsString("verbose output"));
+ }
+
+ @Test
+ void asyncCallThrowingMethod() {
+ // Given async call that throws an exception
+ AsyncCall<StringCallInput, ?> asyncCall = callInput -> {
+ throw new RuntimeException("Ooops!");
+ };
+
+ // When start async pipeline with verbose
+ CallExecutionPipeline.asyncBuilder(ignoredProgressTracker -> asyncCall)
+ .inputProvider(StringCallInput::new)
+ .exceptionHandler(new TestExceptionHandler())
+ .output(new PrintWriter(out))
+ .errOutput(new PrintWriter(errOut))
+ .verbose(true)
+ .build().runPipeline();
+
+ // Then error output contains the message from exception and contains
verbose output
+ assertThat(errOut.toString(), containsString("Ooops!" +
System.lineSeparator()));
+ assertThat(errOut.toString(), containsString("verbose output"));
+ }
}
diff --git
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
index e96ecc35ca..37030c8b77 100644
---
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
+++
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/sql/SqlSchemaProviderTest.java
@@ -44,16 +44,26 @@ class SqlSchemaProviderTest {
}
@Test
- public void testProviderWithoutTimeout() {
+ public void testProviderWithoutTimeout() throws InterruptedException {
SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 0);
- Assertions.assertNotEquals(provider.getSchema(), provider.getSchema());
+
+ SqlSchema firstSchema = provider.getSchema();
+ provider.getSchema(); // trigger update
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
+ Assertions.assertNotEquals(firstSchema, provider.getSchema());
}
@Test
public void testProviderWith1secTimeout() throws InterruptedException {
SqlSchemaProvider provider = new SqlSchemaProvider(supplier, 1);
+
+ provider.initStateAsync();
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1));
+
SqlSchema schema = provider.getSchema();
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+
Assertions.assertNotEquals(schema, provider.getSchema());
}
}