refactor into observable
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7976e1d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7976e1d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7976e1d5 Branch: refs/heads/two-dot-o-dev Commit: 7976e1d577d9ba3d5f096db33b6984fa5226b7eb Parents: 2df4013 Author: Shawn Feldman <[email protected]> Authored: Thu Mar 26 14:38:42 2015 -0600 Committer: Shawn Feldman <[email protected]> Committed: Thu Mar 26 14:38:42 2015 -0600 ---------------------------------------------------------------------- .../migration/EsIndexDataMigrationImpl.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7976e1d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java index 64b7d29..13e6526 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder import org.elasticsearch.client.AdminClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; import java.util.ArrayList; import java.util.List; @@ -64,21 +65,22 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope> @Override public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> migrationDataProvider, ProgressObserver observer) { final AtomicInteger integer = new AtomicInteger(); - migrationDataProvider.getData().doOnNext(applicationScope -> { - LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig,applicationScope); - String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read); - AdminClient adminClient = provider.getClient().admin(); + final AdminClient adminClient = provider.getClient().admin(); - for (String index : indexes) { + migrationDataProvider.getData().flatMap(applicationScope -> { + LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig, applicationScope); + String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read); + return Observable.from(indexes); + }) + .doOnNext(index -> { IndicesAliasesRequestBuilder aliasesRequestBuilder = adminClient.indices().prepareAliases(); aliasesRequestBuilder = adminClient.indices().prepareAliases(); // add read alias aliasesRequestBuilder.addAlias(index, indexIdentifier.getAlias().getReadAlias()); integer.incrementAndGet(); - } - }) - .doOnError(error -> log.error("failed to migrate index",error)) - .toBlocking().last(); + }) + .doOnError(error -> log.error("failed to migrate index", error)) + .toBlocking().lastOrDefault(null); return dataVersion.getImplementationVersion(); }
