Repository: ignite Updated Branches: refs/heads/master 388f7ffc4 -> 7d4e1fd11
http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/components/page-configure/store/effects.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/components/page-configure/store/effects.js b/modules/web-console/frontend/app/components/page-configure/store/effects.js index 014076b..ac807dc 100644 --- a/modules/web-console/frontend/app/components/page-configure/store/effects.js +++ b/modules/web-console/frontend/app/components/page-configure/store/effects.js @@ -15,23 +15,8 @@ * limitations under the License. */ -import {Observable} from 'rxjs/Observable'; -import 'rxjs/add/operator/ignoreElements'; -import 'rxjs/add/operator/let'; -import 'rxjs/add/operator/exhaustMap'; -import 'rxjs/add/operator/switchMap'; -import 'rxjs/add/operator/map'; -import 'rxjs/add/operator/pluck'; -import 'rxjs/add/operator/withLatestFrom'; -import 'rxjs/add/operator/merge'; -import 'rxjs/add/operator/take'; -import 'rxjs/add/operator/catch'; -import 'rxjs/add/operator/zip'; -import {merge} from 'rxjs/observable/merge'; -import {empty} from 'rxjs/observable/empty'; -import {of} from 'rxjs/observable/of'; -import {from} from 'rxjs/observable/from'; -import {fromPromise} from 'rxjs/observable/fromPromise'; +import {merge, empty, of, from} from 'rxjs'; +import {mapTo, filter, tap, ignoreElements, exhaustMap, switchMap, map, pluck, withLatestFrom, take, catchError, zip} from 'rxjs/operators'; import uniq from 'lodash/uniq'; import {uniqueName} from 'app/utils/uniqueName'; import {defaultNames} from '../defaultNames'; @@ -81,7 +66,7 @@ import Models from 'app/services/Models'; import IGFSs from 'app/services/IGFSs'; import {Confirm} from 'app/services/Confirm.service'; -export const ofType = (type) => (s) => s.filter((a) => a.type === type); +export const ofType = (type) => (s) => s.pipe(filter((a) => a.type === type)); export default class ConfigEffects { static $inject = [ @@ -124,35 +109,37 @@ export default class ConfigEffects { this.Confirm = Confirm; this.configurationDownload = ConfigurationDownload; - this.loadConfigurationEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_COMPLETE_CONFIGURATION')) - .exhaustMap((action) => { - return fromPromise(this.Clusters.getConfiguration(action.clusterID)) - .switchMap(({data}) => of( + this.loadConfigurationEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_COMPLETE_CONFIGURATION'), + exhaustMap((action) => { + return from(this.Clusters.getConfiguration(action.clusterID)).pipe( + switchMap(({data}) => of( completeConfiguration(data), {type: 'LOAD_COMPLETE_CONFIGURATION_OK', data} - )) - .catch((error) => of({ + )), + catchError((error) => of({ type: 'LOAD_COMPLETE_CONFIGURATION_ERR', error: { message: `Failed to load cluster configuration: ${error.data}.` }, action - })); - }); + }))); + }) + ); - this.storeConfigurationEffect$ = this.ConfigureState.actions$ - .let(ofType(COMPLETE_CONFIGURATION)) - .exhaustMap(({configuration: {cluster, caches, models, igfss}}) => of(...[ + this.storeConfigurationEffect$ = this.ConfigureState.actions$.pipe( + ofType(COMPLETE_CONFIGURATION), + exhaustMap(({configuration: {cluster, caches, models, igfss}}) => of(...[ cluster && {type: clustersActionTypes.UPSERT, items: [cluster]}, caches && caches.length && {type: cachesActionTypes.UPSERT, items: caches}, models && models.length && {type: modelsActionTypes.UPSERT, items: models}, igfss && igfss.length && {type: igfssActionTypes.UPSERT, items: igfss} - ].filter((v) => v))); + ].filter((v) => v))) + ); - this.saveCompleteConfigurationEffect$ = this.ConfigureState.actions$ - .let(ofType(ADVANCED_SAVE_COMPLETE_CONFIGURATION)) - .switchMap((action) => { + this.saveCompleteConfigurationEffect$ = this.ConfigureState.actions$.pipe( + ofType(ADVANCED_SAVE_COMPLETE_CONFIGURATION), + switchMap((action) => { const actions = [ { type: modelsActionTypes.UPSERT, @@ -188,62 +175,68 @@ export default class ConfigEffects { } ].filter((a) => a.items.length); - return of(...actions) - .merge( - fromPromise(Clusters.saveAdvanced(action.changedItems)) - .switchMap((res) => { - return of( - {type: 'EDIT_CLUSTER', cluster: action.changedItems.cluster}, - {type: 'ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK', changedItems: action.changedItems} - ); - }) - .catch((res) => { - return of({ - type: 'ADVANCED_SAVE_COMPLETE_CONFIGURATION_ERR', - changedItems: action.changedItems, - action, - error: { - message: `Failed to save cluster "${action.changedItems.cluster.name}": ${res.data}.` - } - }, { - type: 'UNDO_ACTIONS', - actions - }); - }) + return merge( + of(...actions), + from(Clusters.saveAdvanced(action.changedItems)).pipe( + switchMap((res) => { + return of( + {type: 'EDIT_CLUSTER', cluster: action.changedItems.cluster}, + {type: 'ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK', changedItems: action.changedItems} + ); + }), + catchError((res) => { + return of({ + type: 'ADVANCED_SAVE_COMPLETE_CONFIGURATION_ERR', + changedItems: action.changedItems, + action, + error: { + message: `Failed to save cluster "${action.changedItems.cluster.name}": ${res.data}.` + } + }, { + type: 'UNDO_ACTIONS', + actions + }); + }) + ) ); - }); - - this.addCacheToEditEffect$ = this.ConfigureState.actions$ - .let(ofType('ADD_CACHE_TO_EDIT')) - .switchMap(() => this.ConfigureState.state$.let(this.ConfigSelectors.selectCacheToEdit('new')).take(1)) - .map((cache) => ({type: 'UPSERT_CLUSTER_ITEM', itemType: 'caches', item: cache})); - - this.errorNotificationsEffect$ = this.ConfigureState.actions$ - .filter((a) => a.error) - .do((action) => this.IgniteMessages.showError(action.error)) - .ignoreElements(); - - this.loadUserClustersEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_USER_CLUSTERS')) - .exhaustMap((a) => { - return fromPromise(this.Clusters.getClustersOverview()) - .switchMap(({data}) => of( + }) + ); + + this.addCacheToEditEffect$ = this.ConfigureState.actions$.pipe( + ofType('ADD_CACHE_TO_EDIT'), + switchMap(() => this.ConfigureState.state$.pipe(this.ConfigSelectors.selectCacheToEdit('new'), take(1))), + map((cache) => ({type: 'UPSERT_CLUSTER_ITEM', itemType: 'caches', item: cache})) + ); + + this.errorNotificationsEffect$ = this.ConfigureState.actions$.pipe( + filter((a) => a.error), + tap((action) => this.IgniteMessages.showError(action.error)), + ignoreElements() + ); + + this.loadUserClustersEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_USER_CLUSTERS'), + exhaustMap((a) => { + return from(this.Clusters.getClustersOverview()).pipe( + switchMap(({data}) => of( {type: shortClustersActionTypes.SET, items: data}, {type: `${a.type}_OK`} - )) - .catch((error) => of({ + )), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load clusters: ${error.data}` }, action: a - })); - }); + })) + ); + }) + ); - this.loadAndEditClusterEffect$ = ConfigureState.actions$ - .let(ofType('LOAD_AND_EDIT_CLUSTER')) - .withLatestFrom(this.ConfigureState.state$.let(this.ConfigSelectors.selectShortClustersValue())) - .exhaustMap(([a, shortClusters]) => { + this.loadAndEditClusterEffect$ = ConfigureState.actions$.pipe( + ofType('LOAD_AND_EDIT_CLUSTER'), + withLatestFrom(this.ConfigureState.state$.pipe(this.ConfigSelectors.selectShortClustersValue())), + exhaustMap(([a, shortClusters]) => { if (a.clusterID === 'new') { return of( { @@ -256,226 +249,268 @@ export default class ConfigEffects { {type: 'LOAD_AND_EDIT_CLUSTER_OK'} ); } - return this.ConfigureState.state$.let(this.ConfigSelectors.selectCluster(a.clusterID)).take(1) - .switchMap((cluster) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectCluster(a.clusterID), + take(1), + switchMap((cluster) => { if (cluster) { return of( {type: 'EDIT_CLUSTER', cluster}, {type: 'LOAD_AND_EDIT_CLUSTER_OK'} ); } - return from(this.Clusters.getCluster(a.clusterID)) - .switchMap(({data}) => of( + return from(this.Clusters.getCluster(a.clusterID)).pipe( + switchMap(({data}) => of( {type: clustersActionTypes.UPSERT, items: [data]}, {type: 'EDIT_CLUSTER', cluster: data}, {type: 'LOAD_AND_EDIT_CLUSTER_OK'} - )) - .catch((error) => of({ + )), + catchError((error) => of({ type: 'LOAD_AND_EDIT_CLUSTER_ERR', error: { message: `Failed to load cluster: ${error.data}.` } - })); - }); - }); - - this.loadCacheEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_CACHE')) - .exhaustMap((a) => { - return this.ConfigureState.state$.let(this.ConfigSelectors.selectCache(a.cacheID)).take(1) - .switchMap((cache) => { + })) + ); + }) + ); + }) + ); + + this.loadCacheEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_CACHE'), + exhaustMap((a) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectCache(a.cacheID), + take(1), + switchMap((cache) => { if (cache) return of({type: `${a.type}_OK`, cache}); - return fromPromise(this.Caches.getCache(a.cacheID)) - .switchMap(({data}) => of( + return from(this.Caches.getCache(a.cacheID)).pipe( + switchMap(({data}) => of( {type: 'CACHE', cache: data}, {type: `${a.type}_OK`, cache: data} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load cache: ${error.data}.` } - })); - }); + })) + ); + }) + ); - this.storeCacheEffect$ = this.ConfigureState.actions$ - .let(ofType('CACHE')) - .map((a) => ({type: cachesActionTypes.UPSERT, items: [a.cache]})); + this.storeCacheEffect$ = this.ConfigureState.actions$.pipe( + ofType('CACHE'), + map((a) => ({type: cachesActionTypes.UPSERT, items: [a.cache]})) + ); - this.loadShortCachesEffect$ = ConfigureState.actions$ - .let(ofType('LOAD_SHORT_CACHES')) - .exhaustMap((a) => { + this.loadShortCachesEffect$ = ConfigureState.actions$.pipe( + ofType('LOAD_SHORT_CACHES'), + exhaustMap((a) => { if (!(a.ids || []).length) return of({type: `${a.type}_OK`}); - return this.ConfigureState.state$.let(this.ConfigSelectors.selectShortCaches()).take(1) - .switchMap((items) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectShortCaches(), + take(1), + switchMap((items) => { if (!items.pristine && a.ids && a.ids.every((_id) => items.value.has(_id))) return of({type: `${a.type}_OK`}); - return fromPromise(this.Clusters.getClusterCaches(a.clusterID)) - .switchMap(({data}) => of( + return from(this.Clusters.getClusterCaches(a.clusterID)).pipe( + switchMap(({data}) => of( {type: shortCachesActionTypes.UPSERT, items: data}, {type: `${a.type}_OK`} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load caches: ${error.data}.` }, action: a - })); - }); - - this.loadIgfsEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_IGFS')) - .exhaustMap((a) => { - return this.ConfigureState.state$.let(this.ConfigSelectors.selectIGFS(a.igfsID)).take(1) - .switchMap((igfs) => { + })) + ); + }) + ); + + this.loadIgfsEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_IGFS'), + exhaustMap((a) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectIGFS(a.igfsID), + take(1), + switchMap((igfs) => { if (igfs) return of({type: `${a.type}_OK`, igfs}); - return fromPromise(this.IGFSs.getIGFS(a.igfsID)) - .switchMap(({data}) => of( + return from(this.IGFSs.getIGFS(a.igfsID)).pipe( + switchMap(({data}) => of( {type: 'IGFS', igfs: data}, {type: `${a.type}_OK`, igfs: data} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load IGFS: ${error.data}.` } - })); - }); + })) + ); + }) + ); - this.storeIgfsEffect$ = this.ConfigureState.actions$ - .let(ofType('IGFS')) - .map((a) => ({type: igfssActionTypes.UPSERT, items: [a.igfs]})); + this.storeIgfsEffect$ = this.ConfigureState.actions$.pipe( + ofType('IGFS'), + map((a) => ({type: igfssActionTypes.UPSERT, items: [a.igfs]})) + ); - this.loadShortIgfssEffect$ = ConfigureState.actions$ - .let(ofType('LOAD_SHORT_IGFSS')) - .exhaustMap((a) => { + this.loadShortIgfssEffect$ = ConfigureState.actions$.pipe( + ofType('LOAD_SHORT_IGFSS'), + exhaustMap((a) => { if (!(a.ids || []).length) { return of( {type: shortIGFSsActionTypes.UPSERT, items: []}, {type: `${a.type}_OK`} ); } - return this.ConfigureState.state$.let(this.ConfigSelectors.selectShortIGFSs()).take(1) - .switchMap((items) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectShortIGFSs(), + take(1), + switchMap((items) => { if (!items.pristine && a.ids && a.ids.every((_id) => items.value.has(_id))) return of({type: `${a.type}_OK`}); - return fromPromise(this.Clusters.getClusterIGFSs(a.clusterID)) - .switchMap(({data}) => of( + return from(this.Clusters.getClusterIGFSs(a.clusterID)).pipe( + switchMap(({data}) => of( {type: shortIGFSsActionTypes.UPSERT, items: data}, {type: `${a.type}_OK`} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load IGFSs: ${error.data}.` }, action: a - })); - }); - - this.loadModelEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_MODEL')) - .exhaustMap((a) => { - return this.ConfigureState.state$.let(this.ConfigSelectors.selectModel(a.modelID)).take(1) - .switchMap((model) => { + })) + ); + }) + ); + + this.loadModelEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_MODEL'), + exhaustMap((a) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectModel(a.modelID), + take(1), + switchMap((model) => { if (model) return of({type: `${a.type}_OK`, model}); - return fromPromise(this.Models.getModel(a.modelID)) - .switchMap(({data}) => of( + return from(this.Models.getModel(a.modelID)).pipe( + switchMap(({data}) => of( {type: 'MODEL', model: data}, {type: `${a.type}_OK`, model: data} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load domain model: ${error.data}.` } - })); - }); + })) + ); + }) + ); - this.storeModelEffect$ = this.ConfigureState.actions$ - .let(ofType('MODEL')) - .map((a) => ({type: modelsActionTypes.UPSERT, items: [a.model]})); + this.storeModelEffect$ = this.ConfigureState.actions$.pipe( + ofType('MODEL'), + map((a) => ({type: modelsActionTypes.UPSERT, items: [a.model]})) + ); - this.loadShortModelsEffect$ = this.ConfigureState.actions$ - .let(ofType('LOAD_SHORT_MODELS')) - .exhaustMap((a) => { + this.loadShortModelsEffect$ = this.ConfigureState.actions$.pipe( + ofType('LOAD_SHORT_MODELS'), + exhaustMap((a) => { if (!(a.ids || []).length) { return of( {type: shortModelsActionTypes.UPSERT, items: []}, {type: `${a.type}_OK`} ); } - return this.ConfigureState.state$.let(this.ConfigSelectors.selectShortModels()).take(1) - .switchMap((items) => { + return this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectShortModels(), + take(1), + switchMap((items) => { if (!items.pristine && a.ids && a.ids.every((_id) => items.value.has(_id))) return of({type: `${a.type}_OK`}); - return fromPromise(this.Clusters.getClusterModels(a.clusterID)) - .switchMap(({data}) => of( + return from(this.Clusters.getClusterModels(a.clusterID)).pipe( + switchMap(({data}) => of( {type: shortModelsActionTypes.UPSERT, items: data}, {type: `${a.type}_OK`} - )); - }) - .catch((error) => of({ + )) + ); + }), + catchError((error) => of({ type: `${a.type}_ERR`, error: { message: `Failed to load domain models: ${error.data}.` }, action: a - })); - }); - - this.basicSaveRedirectEffect$ = this.ConfigureState.actions$ - .let(ofType(BASIC_SAVE_OK)) - .do((a) => this.$state.go('base.configuration.edit.basic', {clusterID: a.changedItems.cluster._id}, {location: 'replace', custom: {justIDUpdate: true}})) - .ignoreElements(); - - this.basicDownloadAfterSaveEffect$ = this.ConfigureState.actions$.let(ofType(BASIC_SAVE_AND_DOWNLOAD)) - .zip(this.ConfigureState.actions$.let(ofType(BASIC_SAVE_OK))) - .pluck('1') - .do((a) => this.configurationDownload.downloadClusterConfiguration(a.changedItems.cluster)) - .ignoreElements(); + })) + ); + }) + ); + + this.basicSaveRedirectEffect$ = this.ConfigureState.actions$.pipe( + ofType(BASIC_SAVE_OK), + tap((a) => this.$state.go('base.configuration.edit.basic', {clusterID: a.changedItems.cluster._id}, {location: 'replace', custom: {justIDUpdate: true}})), + ignoreElements() + ); + + this.basicDownloadAfterSaveEffect$ = this.ConfigureState.actions$.pipe( + ofType(BASIC_SAVE_AND_DOWNLOAD), + zip(this.ConfigureState.actions$.pipe(ofType(BASIC_SAVE_OK))), + pluck('1'), + tap((a) => this.configurationDownload.downloadClusterConfiguration(a.changedItems.cluster)), + ignoreElements() + ); this.advancedDownloadAfterSaveEffect$ = merge( - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_CLUSTER)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_CACHE)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_IGFS)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_MODEL)), - ) - .filter((a) => a.download) - .zip(this.ConfigureState.actions$.let(ofType('ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK'))) - .pluck('1') - .do((a) => this.configurationDownload.downloadClusterConfiguration(a.changedItems.cluster)) - .ignoreElements(); - - this.advancedSaveRedirectEffect$ = this.ConfigureState.actions$ - .let(ofType('ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK')) - .withLatestFrom(this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_COMPLETE_CONFIGURATION))) - .pluck('1', 'changedItems') - .map((req) => { + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_CLUSTER)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_CACHE)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_IGFS)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_MODEL)), + ).pipe( + filter((a) => a.download), + zip(this.ConfigureState.actions$.pipe(ofType('ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK'))), + pluck('1'), + tap((a) => this.configurationDownload.downloadClusterConfiguration(a.changedItems.cluster)), + ignoreElements() + ); + + this.advancedSaveRedirectEffect$ = this.ConfigureState.actions$.pipe( + ofType('ADVANCED_SAVE_COMPLETE_CONFIGURATION_OK'), + withLatestFrom(this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_COMPLETE_CONFIGURATION))), + pluck('1', 'changedItems'), + map((req) => { const firstChangedItem = Object.keys(req).filter((k) => k !== 'cluster') .map((k) => Array.isArray(req[k]) ? [k, req[k][0]] : [k, req[k]]) .filter((v) => v[1]) .pop(); return firstChangedItem ? [...firstChangedItem, req.cluster] : ['cluster', req.cluster, req.cluster]; - }) - .do(([type, value, cluster]) => { + }), + tap(([type, value, cluster]) => { const go = (state, params = {}) => this.$state.go( state, {...params, clusterID: cluster._id}, {location: 'replace', custom: {justIDUpdate: true}} ); @@ -523,66 +558,76 @@ export default class ConfigEffects { default: break; } - }) - .ignoreElements(); + }), + ignoreElements() + ); - this.removeClusterItemsEffect$ = this.ConfigureState.actions$ - .let(ofType(REMOVE_CLUSTER_ITEMS)) - .exhaustMap((a) => { + this.removeClusterItemsEffect$ = this.ConfigureState.actions$.pipe( + ofType(REMOVE_CLUSTER_ITEMS), + exhaustMap((a) => { return a.confirm // TODO: list items to remove in confirmation - ? fromPromise(this.Confirm.confirm('Are you sure you want to remove these items?')) - .mapTo(a) - .catch(() => empty()) + ? from(this.Confirm.confirm('Are you sure you want to remove these items?')).pipe( + mapTo(a), + catchError(() => empty()) + ) : of(a); - }) - .map((a) => removeClusterItemsConfirmed(a.clusterID, a.itemType, a.itemIDs)); - - this.persistRemovedClusterItemsEffect$ = this.ConfigureState.actions$ - .let(ofType(REMOVE_CLUSTER_ITEMS_CONFIRMED)) - .withLatestFrom(this.ConfigureState.actions$.let(ofType(REMOVE_CLUSTER_ITEMS))) - .filter(([a, b]) => { + }), + map((a) => removeClusterItemsConfirmed(a.clusterID, a.itemType, a.itemIDs)) + ); + + this.persistRemovedClusterItemsEffect$ = this.ConfigureState.actions$.pipe( + ofType(REMOVE_CLUSTER_ITEMS_CONFIRMED), + withLatestFrom(this.ConfigureState.actions$.pipe(ofType(REMOVE_CLUSTER_ITEMS))), + filter(([a, b]) => { return a.itemType === b.itemType && b.save && JSON.stringify(a.itemIDs) === JSON.stringify(b.itemIDs); - }) - .pluck('0') - .withLatestFrom(this.ConfigureState.state$.pluck('edit')) - .map(([action, edit]) => advancedSaveCompleteConfiguration(edit)); - - this.confirmClustersRemovalEffect$ = this.ConfigureState.actions$ - .let(ofType(CONFIRM_CLUSTERS_REMOVAL)) - .pluck('clusterIDs') - .switchMap((ids) => this.ConfigureState.state$.let(this.ConfigSelectors.selectClusterNames(ids)).take(1)) - .exhaustMap((names) => { - return fromPromise(this.Confirm.confirm(` + }), + pluck('0'), + withLatestFrom(this.ConfigureState.state$.pipe(pluck('edit'))), + map(([action, edit]) => advancedSaveCompleteConfiguration(edit)) + ); + + this.confirmClustersRemovalEffect$ = this.ConfigureState.actions$.pipe( + ofType(CONFIRM_CLUSTERS_REMOVAL), + pluck('clusterIDs'), + switchMap((ids) => this.ConfigureState.state$.pipe( + this.ConfigSelectors.selectClusterNames(ids), + take(1) + )), + exhaustMap((names) => { + return from(this.Confirm.confirm(` <p>Are you sure you want to remove these clusters?</p> <ul>${names.map((name) => `<li>${name}</li>`).join('')}</ul> - `)) - .map(confirmClustersRemovalOK) - .catch(() => Observable.empty()); - }); - - this.persistRemovedClustersLocallyEffect$ = this.ConfigureState.actions$ - .let(ofType(CONFIRM_CLUSTERS_REMOVAL_OK)) - .withLatestFrom(this.ConfigureState.actions$.let(ofType(CONFIRM_CLUSTERS_REMOVAL))) - .switchMap(([, {clusterIDs}]) => of( + `)).pipe( + map(confirmClustersRemovalOK), + catchError(() => empty()) + ); + }) + ); + + this.persistRemovedClustersLocallyEffect$ = this.ConfigureState.actions$.pipe( + ofType(CONFIRM_CLUSTERS_REMOVAL_OK), + withLatestFrom(this.ConfigureState.actions$.pipe(ofType(CONFIRM_CLUSTERS_REMOVAL))), + switchMap(([, {clusterIDs}]) => of( {type: shortClustersActionTypes.REMOVE, ids: clusterIDs}, {type: clustersActionTypes.REMOVE, ids: clusterIDs} - )); - - this.persistRemovedClustersRemotelyEffect$ = this.ConfigureState.actions$ - .let(ofType(CONFIRM_CLUSTERS_REMOVAL_OK)) - .withLatestFrom( - this.ConfigureState.actions$.let(ofType(CONFIRM_CLUSTERS_REMOVAL)), - this.ConfigureState.actions$.let(ofType(shortClustersActionTypes.REMOVE)), - this.ConfigureState.actions$.let(ofType(clustersActionTypes.REMOVE)) - ) - .switchMap(([, {clusterIDs}, ...backup]) => this.Clusters.removeCluster$(clusterIDs) - .mapTo({ + )) + ); + + this.persistRemovedClustersRemotelyEffect$ = this.ConfigureState.actions$.pipe( + ofType(CONFIRM_CLUSTERS_REMOVAL_OK), + withLatestFrom( + this.ConfigureState.actions$.pipe(ofType(CONFIRM_CLUSTERS_REMOVAL)), + this.ConfigureState.actions$.pipe(ofType(shortClustersActionTypes.REMOVE)), + this.ConfigureState.actions$.pipe(ofType(clustersActionTypes.REMOVE)) + ), + switchMap(([, {clusterIDs}, ...backup]) => this.Clusters.removeCluster$(clusterIDs).pipe( + mapTo({ type: 'REMOVE_CLUSTERS_OK' - }) - .catch((e) => of( + }), + catchError((e) => of( { type: 'REMOVE_CLUSTERS_ERR', error: { @@ -594,13 +639,15 @@ export default class ConfigEffects { actions: backup } )) - ); + )) + ); - this.notifyRemoteClustersRemoveSuccessEffect$ = this.ConfigureState.actions$ - .let(ofType('REMOVE_CLUSTERS_OK')) - .withLatestFrom(this.ConfigureState.actions$.let(ofType(CONFIRM_CLUSTERS_REMOVAL))) - .do(([, {clusterIDs}]) => this.IgniteMessages.showInfo(`Cluster(s) removed: ${clusterIDs.length}`)) - .ignoreElements(); + this.notifyRemoteClustersRemoveSuccessEffect$ = this.ConfigureState.actions$.pipe( + ofType('REMOVE_CLUSTERS_OK'), + withLatestFrom(this.ConfigureState.actions$.pipe(ofType(CONFIRM_CLUSTERS_REMOVAL))), + tap(([, {clusterIDs}]) => this.IgniteMessages.showInfo(`Cluster(s) removed: ${clusterIDs.length}`)), + ignoreElements() + ); const _applyChangedIDs = (edit, {cache, igfs, model, cluster} = {}) => ({ cluster: { @@ -616,22 +663,24 @@ export default class ConfigEffects { }); this.advancedSaveCacheEffect$ = merge( - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_CLUSTER)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_CACHE)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_IGFS)), - this.ConfigureState.actions$.let(ofType(ADVANCED_SAVE_MODEL)), - ) - .withLatestFrom(this.ConfigureState.state$.pluck('edit')) - .map(([action, edit]) => ({ + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_CLUSTER)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_CACHE)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_IGFS)), + this.ConfigureState.actions$.pipe(ofType(ADVANCED_SAVE_MODEL)), + ).pipe( + withLatestFrom(this.ConfigureState.state$.pipe(pluck('edit'))), + map(([action, edit]) => ({ type: ADVANCED_SAVE_COMPLETE_CONFIGURATION, changedItems: _applyChangedIDs(edit, action) - })); - - this.basicSaveEffect$ = this.ConfigureState.actions$ - .let(ofType(BASIC_SAVE)) - .merge(this.ConfigureState.actions$.let(ofType(BASIC_SAVE_AND_DOWNLOAD))) - .withLatestFrom(this.ConfigureState.state$.pluck('edit')) - .switchMap(([action, edit]) => { + })) + ); + + this.basicSaveEffect$ = merge( + this.ConfigureState.actions$.pipe(ofType(BASIC_SAVE)), + this.ConfigureState.actions$.pipe(ofType(BASIC_SAVE_AND_DOWNLOAD)) + ).pipe( + withLatestFrom(this.ConfigureState.state$.pipe(pluck('edit'))), + switchMap(([action, edit]) => { const changedItems = _applyChangedIDs(edit, {cluster: action.cluster}); const actions = [{ type: cachesActionTypes.UPSERT, @@ -651,24 +700,27 @@ export default class ConfigEffects { } ].filter((a) => a.items.length); - return Observable.of(...actions) - .merge( - Observable.fromPromise(this.Clusters.saveBasic(changedItems)) - .switchMap((res) => Observable.of( - {type: 'EDIT_CLUSTER', cluster: changedItems.cluster}, - basicSaveOK(changedItems) - )) - .catch((res) => Observable.of( - basicSaveErr(changedItems, res), - {type: 'UNDO_ACTIONS', actions} - )) + return merge( + of(...actions), + from(this.Clusters.saveBasic(changedItems)).pipe( + switchMap((res) => of( + {type: 'EDIT_CLUSTER', cluster: changedItems.cluster}, + basicSaveOK(changedItems) + )), + catchError((res) => of( + basicSaveErr(changedItems, res), + {type: 'UNDO_ACTIONS', actions} + )) + ) ); - }); + }) + ); - this.basicSaveOKMessagesEffect$ = this.ConfigureState.actions$ - .let(ofType(BASIC_SAVE_OK)) - .do((action) => this.IgniteMessages.showInfo(`Cluster "${action.changedItems.cluster.name}" saved.`)) - .ignoreElements(); + this.basicSaveOKMessagesEffect$ = this.ConfigureState.actions$.pipe( + ofType(BASIC_SAVE_OK), + tap((action) => this.IgniteMessages.showInfo(`Cluster "${action.changedItems.cluster.name}" saved.`)), + ignoreElements() + ); } /** @@ -691,21 +743,21 @@ export default class ConfigEffects { setTimeout(() => this.ConfigureState.dispatchAction(action)); - return this.ConfigureState.actions$ - .filter((a) => a.type === ok || a.type === err) - .take(1) - .map((a) => { + return this.ConfigureState.actions$.pipe( + filter((a) => a.type === ok || a.type === err), + take(1), + map((a) => { if (a.type === err) throw a; else return a; }) - .toPromise(); + ).toPromise(); }; connect() { return merge( ...Object.keys(this).filter((k) => k.endsWith('Effect$')).map((k) => this[k]) - ).do((a) => this.ConfigureState.dispatchAction(a)).subscribe(); + ).pipe(tap((a) => this.ConfigureState.dispatchAction(a))).subscribe(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/components/page-configure/store/effects.spec.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/components/page-configure/store/effects.spec.js b/modules/web-console/frontend/app/components/page-configure/store/effects.spec.js index 10e0663..f8b3909 100644 --- a/modules/web-console/frontend/app/components/page-configure/store/effects.spec.js +++ b/modules/web-console/frontend/app/components/page-configure/store/effects.spec.js @@ -16,11 +16,10 @@ */ import {assert} from 'chai'; -import {of} from 'rxjs/observable/of'; -import {_throw} from 'rxjs/observable/throw'; +import {of, throwError} from 'rxjs'; +import {TestScheduler} from 'rxjs/testing'; import {default as Effects} from './effects'; import {default as Selectors} from './selectors'; -import {TestScheduler} from 'rxjs/testing/TestScheduler'; const makeMocks = (target, mocks) => new Map(target.$inject.map((provider) => { return (provider in mocks) ? [provider, mocks[provider]] : [provider, {}]; @@ -121,7 +120,7 @@ suite('Configuration store effects', () => { stateMarbles: 'AB', mocks: { ...mocks, - Clusters: {getCluster: () => _throw({data: 'Error'})} + Clusters: {getCluster: () => throwError({data: 'Error'})} } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/components/page-configure/store/selectors.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/components/page-configure/store/selectors.js b/modules/web-console/frontend/app/components/page-configure/store/selectors.js index c9df643..0c70e59 100644 --- a/modules/web-console/frontend/app/components/page-configure/store/selectors.js +++ b/modules/web-console/frontend/app/components/page-configure/store/selectors.js @@ -16,13 +16,8 @@ */ import {uniqueName} from 'app/utils/uniqueName'; -import {of} from 'rxjs/observable/of'; -import {empty} from 'rxjs/observable/empty'; -import {combineLatest} from 'rxjs/observable/combineLatest'; -import 'rxjs/add/operator/filter'; -import 'rxjs/add/operator/mergeMap'; -import 'rxjs/add/observable/combineLatest'; -import {Observable} from 'rxjs/Observable'; +import {of, empty, combineLatest, forkJoin, pipe} from 'rxjs'; +import {filter, pluck, map, switchMap, take, distinctUntilChanged, exhaustMap} from 'rxjs/operators'; import {defaultNames} from '../defaultNames'; import {default as Caches} from 'app/services/Caches'; @@ -30,22 +25,22 @@ import {default as Clusters} from 'app/services/Clusters'; import {default as IGFSs} from 'app/services/IGFSs'; import {default as Models} from 'app/services/Models'; -const isDefined = (s) => s.filter((v) => v); +const isDefined = filter((v) => v); -const selectItems = (path) => (s) => s.filter((s) => s).pluck(path).filter((v) => v); +const selectItems = (path) => pipe(filter((s) => s), pluck(path), filter((v) => v)); -const selectValues = (s) => s.map((v) => v && [...v.value.values()]); +const selectValues = map((v) => v && [...v.value.values()]); -export const selectMapItem = (mapPath, key) => (s) => s.pluck(mapPath).map((v) => v && v.get(key)); +export const selectMapItem = (mapPath, key) => pipe(pluck(mapPath), map((v) => v && v.get(key))); -const selectMapItems = (mapPath, keys) => (s) => s.pluck(mapPath).map((v) => v && keys.map((key) => v.get(key))); +const selectMapItems = (mapPath, keys) => pipe(pluck(mapPath), map((v) => v && keys.map((key) => v.get(key)))); -const selectItemToEdit = ({items, itemFactory, defaultName = '', itemID}) => (s) => s.switchMap((item) => { +const selectItemToEdit = ({items, itemFactory, defaultName = '', itemID}) => switchMap((item) => { if (item) return of(Object.assign(itemFactory(), item)); if (itemID === 'new') - return items.take(1).map((items) => Object.assign(itemFactory(), {name: uniqueName(defaultName, items)})); + return items.pipe(take(1), map((items) => Object.assign(itemFactory(), {name: uniqueName(defaultName, items)}))); if (!itemID) return of(null); @@ -54,22 +49,24 @@ const selectItemToEdit = ({items, itemFactory, defaultName = '', itemID}) => (s) }); const currentShortItems = ({changesKey, shortKey}) => (state$) => { - return Observable.combineLatest( - state$.pluck('edit', 'changes', changesKey).let(isDefined).distinctUntilChanged(), - state$.pluck(shortKey, 'value').let(isDefined).distinctUntilChanged() - ) - .map(([{ids = [], changedItems}, shortItems]) => { + return combineLatest( + state$.pipe(pluck('edit', 'changes', changesKey), isDefined, distinctUntilChanged()), + state$.pipe(pluck(shortKey, 'value'), isDefined, distinctUntilChanged()) + ).pipe( + map(([{ids = [], changedItems}, shortItems]) => { if (!ids.length || !shortItems) return []; return ids.map((id) => changedItems.find(({_id}) => _id === id) || shortItems.get(id)); - }) - .map((v) => v.filter((v) => v)); + }), + map((v) => v.filter((v) => v)) + ); }; -const selectNames = (itemIDs, nameAt = 'name') => (items) => items - .pluck('value') - .map((items) => itemIDs.map((id) => items.get(id)[nameAt])); +const selectNames = (itemIDs, nameAt = 'name') => pipe( + pluck('value'), + map((items) => itemIDs.map((id) => items.get(id)[nameAt])) +); export default class ConfigSelectors { static $inject = ['Caches', 'Clusters', 'IGFSs', 'Models']; @@ -95,17 +92,18 @@ export default class ConfigSelectors { * @returns {(state$: Observable) => Observable<{pristine: boolean, value: Map<string, ig.config.model.ShortDomainModel>}>} */ this.selectShortModels = () => selectItems('shortModels'); - this.selectShortModelsValue = () => (state$) => state$.let(this.selectShortModels()).let(selectValues); + this.selectShortModelsValue = () => (state$) => state$.pipe(this.selectShortModels(), selectValues); /** * @returns {(state$: Observable) => Observable<Array<ig.config.cluster.ShortCluster>>} */ - this.selectShortClustersValue = () => (state$) => state$.let(this.selectShortClusters()).let(selectValues); + this.selectShortClustersValue = () => (state$) => state$.pipe(this.selectShortClusters(), selectValues); /** * @returns {(state$: Observable) => Observable<Array<string>>} */ - this.selectClusterNames = (clusterIDs) => (state$) => state$ - .let(this.selectShortClusters()) - .let(selectNames(clusterIDs)); + this.selectClusterNames = (clusterIDs) => (state$) => state$.pipe( + this.selectShortClusters(), + selectNames(clusterIDs) + ); } selectCluster = (id) => selectMapItem('clusters', id); @@ -118,52 +116,56 @@ export default class ConfigSelectors { selectShortCaches = () => selectItems('shortCaches'); - selectShortCachesValue = () => (state$) => state$.let(this.selectShortCaches()).let(selectValues); + selectShortCachesValue = () => (state$) => state$.pipe(this.selectShortCaches(), selectValues); selectShortIGFSs = () => selectItems('shortIgfss'); - selectShortIGFSsValue = () => (state$) => state$.let(this.selectShortIGFSs()).let(selectValues); + selectShortIGFSsValue = () => (state$) => state$.pipe(this.selectShortIGFSs(), selectValues); - selectShortModelsValue = () => (state$) => state$.let(this.selectShortModels()).let(selectValues); + selectShortModelsValue = () => (state$) => state$.pipe(this.selectShortModels(), selectValues); - selectCacheToEdit = (cacheID) => (state$) => state$ - .let(this.selectCache(cacheID)) - .distinctUntilChanged() - .let(selectItemToEdit({ - items: state$.let(this.selectCurrentShortCaches), + selectCacheToEdit = (cacheID) => (state$) => state$.pipe( + this.selectCache(cacheID), + distinctUntilChanged(), + selectItemToEdit({ + items: state$.pipe(this.selectCurrentShortCaches), itemFactory: () => this.Caches.getBlankCache(), defaultName: defaultNames.cache, itemID: cacheID - })); + }) + ); - selectIGFSToEdit = (itemID) => (state$) => state$ - .let(this.selectIGFS(itemID)) - .distinctUntilChanged() - .let(selectItemToEdit({ - items: state$.let(this.selectCurrentShortIGFSs), + selectIGFSToEdit = (itemID) => (state$) => state$.pipe( + this.selectIGFS(itemID), + distinctUntilChanged(), + selectItemToEdit({ + items: state$.pipe(this.selectCurrentShortIGFSs), itemFactory: () => this.IGFSs.getBlankIGFS(), defaultName: defaultNames.igfs, itemID - })); + }) + ); - selectModelToEdit = (itemID) => (state$) => state$ - .let(this.selectModel(itemID)) - .distinctUntilChanged() - .let(selectItemToEdit({ - items: state$.let(this.selectCurrentShortModels), + selectModelToEdit = (itemID) => (state$) => state$.pipe( + this.selectModel(itemID), + distinctUntilChanged(), + selectItemToEdit({ + items: state$.pipe(this.selectCurrentShortModels), itemFactory: () => this.Models.getBlankModel(), itemID - })); + }) + ); - selectClusterToEdit = (clusterID, defaultName = defaultNames.cluster) => (state$) => state$ - .let(this.selectCluster(clusterID)) - .distinctUntilChanged() - .let(selectItemToEdit({ - items: state$.let(this.selectShortClustersValue()), + selectClusterToEdit = (clusterID, defaultName = defaultNames.cluster) => (state$) => state$.pipe( + this.selectCluster(clusterID), + distinctUntilChanged(), + selectItemToEdit({ + items: state$.pipe(this.selectShortClustersValue()), itemFactory: () => this.Clusters.getBlankCluster(), defaultName, itemID: clusterID - })); + }) + ); selectCurrentShortCaches = currentShortItems({changesKey: 'caches', shortKey: 'shortCaches'}); @@ -176,34 +178,33 @@ export default class ConfigSelectors { return of([]); return combineLatest( - state$.let(this.selectCluster(clusterID)).pluck('caches'), - state$.let(this.selectShortCaches()).pluck('value'), + state$.pipe(this.selectCluster(clusterID), pluck('caches')), + state$.pipe(this.selectShortCaches(), pluck('value')), (ids, items) => ids.map((id) => items.get(id)) ); }; selectCompleteClusterConfiguration = ({clusterID, isDemo}) => (state$) => { const hasValues = (array) => !array.some((v) => !v); - return state$.let(this.selectCluster(clusterID)) - .exhaustMap((cluster) => { - if (!cluster) - return of({__isComplete: false}); - - const withSpace = (array) => array.map((c) => ({...c, space: cluster.space})); - - return Observable.forkJoin( - state$.let(selectMapItems('caches', cluster.caches || [])).take(1), - state$.let(selectMapItems('models', cluster.models || [])).take(1), - state$.let(selectMapItems('igfss', cluster.igfss || [])).take(1), - ) - .map(([caches, models, igfss]) => ({ - cluster, - caches, - domains: models, - igfss, - spaces: [{_id: cluster.space, demo: isDemo}], - __isComplete: !!cluster && !(!hasValues(caches) || !hasValues(models) || !hasValues(igfss)) - })); - }); + return state$.pipe( + this.selectCluster(clusterID), + exhaustMap((cluster) => { + if (!cluster) + return of({__isComplete: false}); + + return forkJoin( + state$.pipe(selectMapItems('caches', cluster.caches || []), take(1)), + state$.pipe(selectMapItems('models', cluster.models || []), take(1)), + state$.pipe(selectMapItems('igfss', cluster.igfss || []), take(1)), + ).pipe(map(([caches, models, igfss]) => ({ + cluster, + caches, + domains: models, + igfss, + spaces: [{_id: cluster.space, demo: isDemo}], + __isComplete: !!cluster && !(!hasValues(caches) || !hasValues(models) || !hasValues(igfss)) + }))); + }) + ); }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts index 3ac3177..78274e6 100644 --- a/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts +++ b/modules/web-console/frontend/app/components/page-queries/components/queries-notebook/controller.ts @@ -18,15 +18,8 @@ import _ from 'lodash'; import {nonEmpty, nonNil} from 'app/utils/lodashMixins'; import id8 from 'app/utils/id8'; -import 'rxjs/add/operator/mergeMap'; -import 'rxjs/add/operator/merge'; -import 'rxjs/add/operator/switchMap'; -import 'rxjs/add/operator/exhaustMap'; -import 'rxjs/add/operator/distinctUntilChanged'; - -import { fromPromise } from 'rxjs/observable/fromPromise'; -import { timer } from 'rxjs/observable/timer'; -import { defer } from 'rxjs/observable/defer'; +import {timer, merge, defer, from} from 'rxjs'; +import {mergeMap, tap, switchMap, exhaustMap, take} from 'rxjs/operators'; import {CSV} from 'app/services/CSV'; @@ -930,27 +923,28 @@ export class NotebookCtrl { }; const _startWatch = () => { - const awaitClusters$ = fromPromise( + const awaitClusters$ = from( agentMgr.startClusterWatch('Leave Queries', 'default-state')); const finishLoading$ = defer(() => { if (!$root.IgniteDemoMode) Loading.finish('sqlLoading'); - }).take(1); + }).pipe(take(1)); const refreshCaches = (period) => { - return timer(0, period).exhaustMap(() => _refreshCaches()).merge(finishLoading$); + return merge(timer(0, period).pipe(exhaustMap(() => _refreshCaches())), finishLoading$); }; - this.refresh$ = awaitClusters$ - .mergeMap(() => agentMgr.currentCluster$) - .do(() => Loading.start('sqlLoading')) - .do(() => { + this.refresh$ = awaitClusters$.pipe( + mergeMap(() => agentMgr.currentCluster$), + tap(() => Loading.start('sqlLoading')), + tap(() => { _.forEach($scope.notebook.paragraphs, (paragraph) => { paragraph.reset($interval); }); - }) - .switchMap(() => refreshCaches(5000)) + }), + switchMap(() => refreshCaches(5000)) + ) .subscribe(); }; http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/modules/agent/AgentManager.service.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js index ef0d423..56eef35 100644 --- a/modules/web-console/frontend/app/modules/agent/AgentManager.service.js +++ b/modules/web-console/frontend/app/modules/agent/AgentManager.service.js @@ -18,11 +18,8 @@ import _ from 'lodash'; import {nonEmpty, nonNil} from 'app/utils/lodashMixins'; -import {BehaviorSubject} from 'rxjs/BehaviorSubject'; -import 'rxjs/add/operator/first'; -import 'rxjs/add/operator/partition'; -import 'rxjs/add/operator/takeUntil'; -import 'rxjs/add/operator/pluck'; +import {BehaviorSubject} from 'rxjs'; +import {first, pluck, tap, distinctUntilChanged, map, filter} from 'rxjs/operators'; import AgentModal from './AgentModal.service'; // @ts-ignore @@ -185,14 +182,16 @@ export default class AgentManager { let prevCluster; - this.currentCluster$ = this.connectionSbj - .distinctUntilChanged(({ cluster }) => prevCluster === cluster) - .do(({ cluster }) => prevCluster = cluster); + this.currentCluster$ = this.connectionSbj.pipe( + distinctUntilChanged(({ cluster }) => prevCluster === cluster), + tap(({ cluster }) => prevCluster = cluster) + ); - this.clusterIsActive$ = this.connectionSbj - .map(({ cluster }) => cluster) - .filter((cluster) => Boolean(cluster)) - .pluck('active'); + this.clusterIsActive$ = this.connectionSbj.pipe( + map(({ cluster }) => cluster), + filter((cluster) => Boolean(cluster)), + pluck('active') + ); if (!this.isDemoMode()) { this.connectionSbj.subscribe({ @@ -524,7 +523,7 @@ export default class AgentManager { if (this.isDemoMode()) return Promise.resolve(this._executeOnActiveCluster({}, {}, event, params)); - return this.connectionSbj.first().toPromise() + return this.connectionSbj.pipe(first()).toPromise() .then(({cluster}) => { if (_.isNil(cluster)) throw new Error('Failed to execute request on cluster.'); http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/modules/user/User.service.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/modules/user/User.service.js b/modules/web-console/frontend/app/modules/user/User.service.js index 3515c1b..da84dcc 100644 --- a/modules/web-console/frontend/app/modules/user/User.service.js +++ b/modules/web-console/frontend/app/modules/user/User.service.js @@ -15,7 +15,7 @@ * limitations under the License. */ -import {ReplaySubject} from 'rxjs/ReplaySubject'; +import {ReplaySubject} from 'rxjs'; /** * @typedef User http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/services/Clusters.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/services/Clusters.js b/modules/web-console/frontend/app/services/Clusters.js index e0a2ec7..5649f0a 100644 --- a/modules/web-console/frontend/app/services/Clusters.js +++ b/modules/web-console/frontend/app/services/Clusters.js @@ -16,8 +16,7 @@ */ import get from 'lodash/get'; -import {Observable} from 'rxjs/Observable'; -import 'rxjs/add/observable/fromPromise'; +import {from} from 'rxjs'; import ObjectID from 'bson-objectid/objectid'; import {uniqueName} from 'app/utils/uniqueName'; import omit from 'lodash/fp/omit'; @@ -117,7 +116,7 @@ export default class Clusters { } getClustersOverview$() { - return Observable.fromPromise(this.getClustersOverview()); + return from(this.getClustersOverview()); } saveCluster(cluster) { @@ -125,7 +124,7 @@ export default class Clusters { } saveCluster$(cluster) { - return Observable.fromPromise(this.saveCluster(cluster)); + return from(this.saveCluster(cluster)); } removeCluster(cluster) { @@ -133,7 +132,7 @@ export default class Clusters { } removeCluster$(cluster) { - return Observable.fromPromise(this.removeCluster(cluster)); + return from(this.removeCluster(cluster)); } saveBasic(changedItems) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/services/Version.service.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/services/Version.service.js b/modules/web-console/frontend/app/services/Version.service.js index 8bbf6ef..8b76d58 100644 --- a/modules/web-console/frontend/app/services/Version.service.js +++ b/modules/web-console/frontend/app/services/Version.service.js @@ -15,7 +15,7 @@ * limitations under the License. */ -import { BehaviorSubject } from 'rxjs/BehaviorSubject'; +import { BehaviorSubject } from 'rxjs'; import _ from 'lodash'; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/services/store.ts ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/services/store.ts b/modules/web-console/frontend/app/services/store.ts index 19dde69..4498d1e 100644 --- a/modules/web-console/frontend/app/services/store.ts +++ b/modules/web-console/frontend/app/services/store.ts @@ -15,10 +15,8 @@ * limitations under the License. */ -import {BehaviorSubject} from 'rxjs/BehaviorSubject'; -import {Subject} from 'rxjs/Subject'; +import {BehaviorSubject, Subject, merge} from 'rxjs'; import {scan, tap} from 'rxjs/operators'; -import {merge} from 'rxjs/observable/merge'; interface Reducer<State, Actions> { (state: State, action: Actions): State @@ -27,9 +25,9 @@ interface Reducer<State, Actions> { export class Store<Actions, State> { static $inject = ['$injector']; - actions$: Subject<Actions> - state$: BehaviorSubject<State> - private _reducers: Array<Reducer<State, Actions>> + actions$: Subject<Actions>; + state$: BehaviorSubject<State>; + private _reducers: Array<Reducer<State, Actions>>; constructor(private $injector: ng.auto.IInjectorService) { this.$injector = $injector; http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/store/ofType.ts ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/store/ofType.ts b/modules/web-console/frontend/app/store/ofType.ts index 3d3bd8e..220de5f 100644 --- a/modules/web-console/frontend/app/store/ofType.ts +++ b/modules/web-console/frontend/app/store/ofType.ts @@ -15,7 +15,7 @@ * limitations under the License. */ -import {OperatorFunction} from 'rxjs/interfaces'; +import {OperatorFunction} from 'rxjs'; import {filter} from 'rxjs/operators'; type Action = {type: string}; http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/app/utils/SimpleWorkerPool.js ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/app/utils/SimpleWorkerPool.js b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js index 8e7aab7..25c7ae4 100644 --- a/modules/web-console/frontend/app/utils/SimpleWorkerPool.js +++ b/modules/web-console/frontend/app/utils/SimpleWorkerPool.js @@ -16,15 +16,8 @@ */ import _ from 'lodash'; - -import {Observable} from 'rxjs/Observable'; -import {Subject} from 'rxjs/Subject'; -import 'rxjs/add/observable/race'; -import 'rxjs/add/operator/filter'; -import 'rxjs/add/operator/map'; -import 'rxjs/add/operator/pluck'; -import 'rxjs/add/operator/take'; -import 'rxjs/add/operator/toPromise'; +import {Subject, race} from 'rxjs'; +import {filter, take, pluck, map} from 'rxjs/operators'; /** * Simple implementation of workers pool. @@ -112,11 +105,11 @@ export default class SimpleWorkerPool { this._run(); - return Observable.race( - this.messages$.filter((e) => e.tid === tid).take(1).pluck('m', 'data'), - this.errors$.filter((e) => e.tid === tid).take(1).map((e) => { + return race( + this.messages$.pipe(filter((e) => e.tid === tid), take(1), pluck('m', 'data')), + this.errors$.pipe(filter((e) => e.tid === tid), take(1), map((e) => { throw e.e; })) - .take(1).toPromise(); + ).pipe(take(1)).toPromise(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7d4e1fd1/modules/web-console/frontend/package.json ---------------------------------------------------------------------- diff --git a/modules/web-console/frontend/package.json b/modules/web-console/frontend/package.json index e8f4ae2..9d848df 100644 --- a/modules/web-console/frontend/package.json +++ b/modules/web-console/frontend/package.json @@ -30,7 +30,7 @@ "@babel/plugin-transform-parameters": "7.0.0", "@uirouter/angularjs": "1.0.20", "@uirouter/core": "5.0.19", - "@uirouter/rx": "0.4.1", + "@uirouter/rx": "0.5.0", "@uirouter/visualizer": "4.0.2", "angular": "1.7.4", "angular-acl": "0.1.10", @@ -70,7 +70,7 @@ "pako": "1.0.6", "resize-observer-polyfill": "1.5.0", "roboto-font": "0.1.0", - "rxjs": "5.5.11", + "rxjs": "6.3.3", "socket.io-client": "1.7.3", "tf-metatags": "2.0.0" },
