This is an automated email from the ASF dual-hosted git repository. asi pushed a commit to branch UNOMI-505 in repository https://gitbox.apache.org/repos/asf/unomi.git
commit dd5c75d49fdb57e9f328adc2290aabffbb224fb3 Author: Anatol Sialitski <[email protected]> AuthorDate: Thu May 19 12:21:00 2022 +0200 UNOMI-505 Study replication of existing profileIDs into new alias index --- .../unomi/shell/migration/impl/MigrationTo200.java | 114 ++++++++++++++++++++- 1 file changed, 110 insertions(+), 4 deletions(-) diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java index 94dfd40be..3ea172a4f 100644 --- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java +++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/migration/impl/MigrationTo200.java @@ -18,20 +18,25 @@ package org.apache.unomi.shell.migration.impl; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; import org.apache.karaf.shell.api.console.Session; import org.apache.unomi.shell.migration.Migration; +import org.json.JSONArray; import org.json.JSONObject; import org.osgi.framework.BundleContext; import org.osgi.framework.Version; import org.osgi.service.component.annotations.Component; import java.io.IOException; +import java.net.URI; +import java.time.Instant; import java.util.Set; import java.util.stream.Collectors; @@ -63,10 +68,11 @@ public class MigrationTo200 implements Migration { this.session = session; this.esAddress = esAddress; - doExecute(); + doMigrate_Copy_Scope_to_SourceId(); + doMigrate_Create_ProfileAlias_From_Profile(); } - private void doExecute() throws IOException { + private void doMigrate_Copy_Scope_to_SourceId() throws IOException { try (CloseableHttpResponse response = httpClient.execute(new HttpGet(esAddress + "/_aliases"))) { if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { @@ -77,13 +83,13 @@ public class MigrationTo200 implements Migration { collect(Collectors.toSet()); for (String indexName : indices) { - updateMapping(indexName, httpClient); + updateMapping(indexName); } } } } - private void updateMapping(final String indexName, final CloseableHttpClient httpClient) throws IOException { + private void updateMapping(final String indexName) throws IOException { HttpPut httpPut = new HttpPut(esAddress + "/" + indexName + "/_mapping"); httpPut.addHeader("Accept", "application/json"); @@ -147,4 +153,104 @@ public class MigrationTo200 implements Migration { } } + private void doMigrate_Create_ProfileAlias_From_Profile() throws IOException { + Instant migrationTime = Instant.now(); + int initialOffset = 1000; + int size = 1000; + doProcessResponse(migrationTime, initialOffset, size, null); + } + + private void doProcessResponse(Instant migrationTime, int initialOffset, int offset, String scrollId) throws IOException { + HttpUriRequest request = scrollId == null + ? createSearchRequest(offset) + : createSearchRequestWithScrollId(scrollId); + + try (CloseableHttpResponse response = httpClient.execute(request)) { + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + JSONObject responseAsJson = new JSONObject(EntityUtils.toString(response.getEntity())); + + if (responseAsJson.has("hits")) { + JSONObject hitsObject = responseAsJson.getJSONObject("hits"); + if (hitsObject.has("hits")) { + StringBuilder bulkCreateRequest = new StringBuilder(); + JSONArray hits = hitsObject.getJSONArray("hits"); + for (Object o : hits) { + JSONObject hit = (JSONObject) o; + if (hit.has("_source")) { + JSONObject hitSource = hit.getJSONObject("_source"); + if (hitSource.has("itemId")) { + String itemId = hitSource.getString("itemId"); + bulkCreateRequest.append("{\"create\":{\"_id\":\"").append(itemId).append("\"}}\n"). + append("{\"itemId\": \"").append(itemId).append("\", "). + append("\"itemType\": \"profileAlias\", "). + append("\"profileID\": \"").append(itemId).append("\", "). + append("\"scope\": null, "). + append("\"clientID\": \"defaultClientId\", "). + append("\"creationTime\": \"").append(migrationTime.toString()).append("\", "). + append("\"modifiedTime\": \"").append(migrationTime.toString()).append("\" "). + append("}\n"); + } + } + } + + httpClient.execute(createProfileAliasRequest(bulkCreateRequest.toString())); + } + + if (hitsObject.getJSONObject("total").getInt("value") > offset) { + doProcessResponse(migrationTime, initialOffset, offset + initialOffset, responseAsJson.getString("_scroll_id")); + } else { + if (scrollId != null) { + httpClient.execute(createDeleteScrollRequest(scrollId)); + } + } + } + } + } + } + + private HttpPost createSearchRequestWithScrollId(final String scrollId) throws IOException { + final String requestBody = "{\n" + + " \"scroll_id\": \"" + scrollId + "\",\n" + + " \"scroll\": \"1h\"\n" + + "}"; + + final HttpPost request = new HttpPost(esAddress + "/_search/scroll"); + + request.addHeader("Accept", "application/json"); + request.addHeader("Content-Type", "application/json"); + request.setEntity(new StringEntity(requestBody)); + + return request; + } + + private HttpGet createSearchRequest(int size) { + return new HttpGet(esAddress + "/context-profile/_search?&scroll=1h&_source_includes=itemId&size=" + size); + } + + private HttpEntityEnclosingRequestBase createDeleteScrollRequest(final String scrollId) throws IOException { + final HttpEntityEnclosingRequestBase deleteRequest = new HttpEntityEnclosingRequestBase() { + @Override + public String getMethod() { + return "DELETE"; + } + }; + + deleteRequest.setURI(URI.create(esAddress + "/_search/scroll")); + deleteRequest.setEntity(new StringEntity("{ \"scroll_id\": \"" + scrollId + "\" }")); + deleteRequest.addHeader("Accept", "application/json"); + deleteRequest.addHeader("Content-Type", "application/json"); + + return deleteRequest; + } + + private HttpPost createProfileAliasRequest(String bulkRequestAsString) throws IOException { + final HttpPost bulkRequest = new HttpPost(esAddress + "/context-profilealias/_bulk"); + + bulkRequest.addHeader("Accept", "application/json"); + bulkRequest.addHeader("Content-Type", "application/json"); + bulkRequest.setEntity(new StringEntity(bulkRequestAsString)); + + return bulkRequest; + } + }
