This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new 77c2ef4ba UNOMI-505 Study replication of existing profileIDs into new
alias index (#440)
77c2ef4ba is described below
commit 77c2ef4baea6de949cd56e2836a1eb838a14d9c6
Author: anatol-sialitski <[email protected]>
AuthorDate: Tue Jun 21 13:56:26 2022 +0200
UNOMI-505 Study replication of existing profileIDs into new alias index
(#440)
---
.../unomi/shell/migration/impl/MigrationTo200.java | 127 ++++++++++++++++++++-
.../requestBody/bulkSaveProfileAliases.ndjson | 2 +
2 files changed, 128 insertions(+), 1 deletion(-)
diff --git
a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
index 9dc873b6e..410fbd16e 100644
---
a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
+++
b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java
@@ -19,6 +19,7 @@ package org.apache.unomi.shell.migration.impl;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
@@ -28,6 +29,7 @@ import org.apache.http.util.EntityUtils;
import org.apache.karaf.shell.api.console.Session;
import org.apache.unomi.shell.migration.Migration;
import org.apache.unomi.shell.migration.utils.ConsoleUtils;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Version;
@@ -35,8 +37,10 @@ import org.osgi.service.component.annotations.Component;
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -65,7 +69,8 @@ public class MigrationTo200 implements Migration {
public String getDescription() {
return "Updates mapping for an index \"event\" with prefix \"context\"
by default. Adds the \"sourceId\" field and copies value "
+ "from the \"scope\" field to it."
- + "Creates the scope entries in the index \"scope\" from the
existing sopes of the events";
+ + "Creates the scope entries in the index \"scope\" from the
existing scopes of the events. "
+ + "Creates the \"profileAlias\" documents based on
\"profile\".";
}
@Override
@@ -86,6 +91,7 @@ public class MigrationTo200 implements Migration {
}
createScopeMapping(indexPrefix);
createScopes(getSetOfScopes(indexes), indexPrefix);
+ createProfileAliasDocumentsFromProfile();
}
private void updateMapping(final String indexName) throws IOException {
@@ -246,6 +252,125 @@ public class MigrationTo200 implements Migration {
return scopes;
}
+ private void createProfileAliasDocumentsFromProfile() throws IOException {
+ System.out.println("Migration \"Create profileAlias from profile\"
started");
+ Instant migrationTime = Instant.now();
+ int size = 1000;
+ doProcessProfiles(migrationTime, size);
+ System.out.println("Migration \"Create profileAlias from profile\"
completed.");
+ }
+
+ private void doProcessProfiles(Instant migrationTime, int offset) throws
IOException {
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(createSearchRequest(offset));
+
+ while (true) {
+ JSONObject responseAsJson = getResponseAsJSON(response);
+ String scrollId = responseAsJson.getString("_scroll_id");
+ JSONArray hits = getProfileHits(responseAsJson);
+
+ if (hits.length() == 0) {
+ if (scrollId != null) {
+ CloseableHttpResponse deleteScrollResponse =
httpClient.execute(createDeleteScrollRequest(scrollId));
+ if (deleteScrollResponse != null) {
+ deleteScrollResponse.close();
+ }
+ }
+ break;
+ }
+
+ StringBuilder bulkCreateRequest = new StringBuilder();
+ for (Object o : hits) {
+ JSONObject hit = (JSONObject) o;
+ if (hit.has("_source")) {
+ JSONObject profile = hit.getJSONObject("_source");
+ if (profile.has("itemId")) {
+ String itemId = profile.getString("itemId");
+ String bulkSaveProfileAliases =
resourceAsString("requestBody/bulkSaveProfileAliases.ndjson");
+ bulkCreateRequest.append(bulkSaveProfileAliases.
+ replace("$itemId", itemId).
+ replace("$migrationTime",
migrationTime.toString()));
+ }
+ }
+ }
+
+ CloseableHttpResponse bulkResponse =
httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString()));
+ if (bulkResponse != null) {
+ bulkResponse.close();
+ }
+
+ response =
httpClient.execute(createSearchRequestWithScrollId(scrollId));
+ }
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ }
+ }
+
+ private JSONObject getResponseAsJSON(CloseableHttpResponse response)
throws IOException {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ return new JSONObject(EntityUtils.toString(response.getEntity()));
+ }
+ return new JSONObject();
+ }
+
+ private JSONArray getProfileHits(JSONObject responseAsJson) {
+ if (responseAsJson.has("hits")) {
+ JSONObject hitsObject = responseAsJson.getJSONObject("hits");
+ if (hitsObject.has("hits")) {
+ return hitsObject.getJSONArray("hits");
+ }
+ }
+ return new JSONArray();
+ }
+
+ private HttpPost createSearchRequestWithScrollId(final String scrollId)
throws IOException {
+ final String requestBody = "{\n" +
+ " \"scroll_id\": \"" + scrollId + "\",\n" +
+ " \"scroll\": \"1h\"\n" +
+ "}";
+
+ final HttpPost request = new HttpPost(esAddress + "/_search/scroll");
+
+ request.addHeader("Accept", "application/json");
+ request.addHeader("Content-Type", "application/json");
+ request.setEntity(new StringEntity(requestBody));
+
+ return request;
+ }
+
+ private HttpGet createSearchRequest(int size) {
+ return new HttpGet(esAddress +
"/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size);
+ }
+
+ private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final
String scrollId) throws IOException {
+ final HttpEntityEnclosingRequestBase deleteRequest = new
HttpEntityEnclosingRequestBase() {
+ @Override
+ public String getMethod() {
+ return "DELETE";
+ }
+ };
+
+ deleteRequest.setURI(URI.create(esAddress + "/_search/scroll"));
+ deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" +
scrollId + "\" }"));
+ deleteRequest.addHeader("Accept", "application/json");
+ deleteRequest.addHeader("Content-Type", "application/json");
+
+ return deleteRequest;
+ }
+
+ private HttpPost createProfileAliasRequest(String bulkRequestAsString)
throws IOException {
+ final HttpPost bulkRequest = new HttpPost(esAddress +
"/context-profilealias/_bulk");
+
+ bulkRequest.addHeader("Accept", "application/json");
+ bulkRequest.addHeader("Content-Type", "application/json");
+ bulkRequest.setEntity(new StringEntity(bulkRequestAsString));
+
+ return bulkRequest;
+ }
+
protected String resourceAsString(final String resource) {
final URL url = bundleContext.getBundle().getResource(resource);
try (InputStream stream = url.openStream()) {
diff --git
a/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
new file mode 100644
index 000000000..ad886ca35
--- /dev/null
+++
b/tools/shell-commands/src/main/resources/requestBody/bulkSaveProfileAliases.ndjson
@@ -0,0 +1,2 @@
+{ "create" : { "_id": "$itemId" }}
+{ "itemId": "$itemId", "itemType": "profileAlias", "profileID": "$itemId",
"scope": null, "clientID": "defaultClientId", "creationTime":
"$migrationTime", "modifiedTime": "$migrationTime"}