This is an automated email from the ASF dual-hosted git repository.

jsinovassinnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git


The following commit(s) were added to refs/heads/master by this push:
     new 315c4bfa9 [UNOMI-853] Adapt migration job to use asynchronous mode 
avoiding timeout and connection lost (#697)
315c4bfa9 is described below

commit 315c4bfa9c85d744370d5d958eda98e60e452b65
Author: Jérôme Blanchard <[email protected]>
AuthorDate: Wed Oct 9 12:38:36 2024 +0200

    [UNOMI-853] Adapt migration job to use asynchronous mode avoiding timeout 
and connection lost (#697)
    
    * UNOMI-853: Adapt migration to handle long time reindex tasks with 
asynchronous mode.
    
    * UNOMI-853: Fix small typo and update documentation pointing to broken 
karaf link
    
    * UNOMI-853: Finally log a task waiting info every 15s.
---
 manual/src/main/asciidoc/configuration.adoc        |  2 +-
 .../shell/migration/utils/MigrationUtils.java      | 42 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/manual/src/main/asciidoc/configuration.adoc 
b/manual/src/main/asciidoc/configuration.adoc
index 4b3ae9c7d..5f9b7f677 100644
--- a/manual/src/main/asciidoc/configuration.adoc
+++ b/manual/src/main/asciidoc/configuration.adoc
@@ -637,7 +637,7 @@ highly recommended that you design your client applications 
to use the HTTPS por
 The user accounts to access the REST API are actually routed through Karaf's 
JAAS support, which you may find the
 documentation for here :
 
-* 
http://karaf.apache.org/manual/latest/users-guide/security.html[http://karaf.apache.org/manual/latest/users-guide/security.html]
+* 
https://karaf.apache.org/manual/latest/#_security_2[https://karaf.apache.org/manual/latest/#_security_2]
 
 The default username/password is
 
diff --git 
a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/utils/MigrationUtils.java
 
b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/utils/MigrationUtils.java
index 2050d6ba5..2059f1dc8 100644
--- 
a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/utils/MigrationUtils.java
+++ 
b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/utils/MigrationUtils.java
@@ -28,6 +28,8 @@ import org.json.JSONArray;
 import org.json.JSONObject;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.net.URL;
@@ -42,6 +44,8 @@ import static 
org.apache.unomi.shell.migration.service.MigrationConfig.*;
  */
 public class MigrationUtils {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MigrationUtils.class);
+
     public static JSONObject queryWithScroll(CloseableHttpClient httpClient, 
String url) throws IOException {
         url += "?scroll=1m";
 
@@ -129,7 +133,7 @@ public class MigrationUtils {
             if (predefinedMappings == null) {
                 continue;
             }
-            while (predefinedMappings.hasMoreElements()) {
+            if (predefinedMappings.hasMoreElements()) {
                 URL predefinedMappingURL = predefinedMappings.nextElement();
                 return IOUtils.toString(predefinedMappingURL);
             }
@@ -176,7 +180,10 @@ public class MigrationUtils {
     public static void moveToIndex(CloseableHttpClient httpClient, 
BundleContext bundleContext, String esAddress, String sourceIndexName, String 
targetIndexName, String painlessScript) throws Exception {
         String reIndexRequest = resourceAsString(bundleContext, 
"requestBody/2.2.0/base_reindex_request.json").replace("#source", 
sourceIndexName).replace("#dest", targetIndexName).replace("#painless", 
StringUtils.isNotEmpty(painlessScript) ? getScriptPart(painlessScript) : "");
 
-        HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex", 
reIndexRequest, null);
+        // Reindex
+        JSONObject task = new 
JSONObject(HttpUtils.executePostRequest(httpClient, esAddress + 
"/_reindex?wait_for_completion=false", reIndexRequest, null));
+        //Wait for the reindex task to finish
+        waitForTaskToFinish(httpClient, esAddress, task.getString("task"), 
null);
     }
 
     public static void deleteIndex(CloseableHttpClient httpClient, String 
esAddress, String indexName) throws Exception {
@@ -216,7 +223,9 @@ public class MigrationUtils {
             // Recreate the original index with new mappings
             HttpUtils.executePutRequest(httpClient, esAddress + "/" + 
indexName, newIndexSettings, null);
             // Reindex data from clone
-            HttpUtils.executePostRequest(httpClient, esAddress + "/_reindex", 
reIndexRequest, null);
+            JSONObject task = new 
JSONObject(HttpUtils.executePostRequest(httpClient, esAddress + 
"/_reindex?wait_for_completion=false", reIndexRequest, null));
+            //Wait for the reindex task to finish
+            waitForTaskToFinish(httpClient, esAddress, task.getString("task"), 
migrationContext);
         });
 
         migrationContext.performMigrationStep("Reindex step for: " + indexName 
+ " (delete clone)", () -> {
@@ -249,7 +258,7 @@ public class MigrationUtils {
             }
 
             // no more results, delete scroll
-            if (hits.length() == 0) {
+            if (hits.isEmpty()) {
                 if (scrollId != null) {
                     HttpUtils.executeDeleteRequest(httpClient, esAddress + 
"/_search/scroll/" + scrollId, null);
                 }
@@ -281,6 +290,31 @@ public class MigrationUtils {
 
     }
 
+    public static void waitForTaskToFinish(CloseableHttpClient httpClient, 
String esAddress, String taskId, MigrationContext migrationContext) throws 
IOException {
+        while (true) {
+            final JSONObject status = new JSONObject(
+                    HttpUtils.executeGetRequest(httpClient, esAddress + 
"/_tasks/" + taskId + "?wait_for_completion=true&timeout=15s",
+                            null));
+            if (status.has("completed") && status.getBoolean("completed")) {
+                if (migrationContext != null) {
+                    migrationContext.printMessage("Task is completed");
+                } else {
+                    LOGGER.info("Task is completed");
+                }
+                break;
+            }
+            if (status.has("error")) {
+                final JSONObject error = status.getJSONObject("error");
+                throw new IOException("Task error: " + error.getString("type") 
+ " - " + error.getString("reason"));
+            }
+            if (migrationContext != null) {
+                migrationContext.printMessage("Waiting for Task " + taskId + " 
to complete");
+            } else {
+                LOGGER.info("Waiting for Task {} to complete", taskId);
+            }
+        }
+    }
+
     public interface ScrollCallback {
         void execute(String hits);
     }

Reply via email to