Repository: storm Updated Branches: refs/heads/master 177cb95f4 -> a4fc110f7
Adding storm blobstore migrator. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4af7da0f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4af7da0f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4af7da0f Branch: refs/heads/master Commit: 4af7da0ff48bd347aa49509744c0e6a8e749341f Parents: da2f035 Author: Kyle Nusbaum <[email protected]> Authored: Tue Sep 26 15:17:32 2017 -0500 Committer: Kyle Nusbaum <[email protected]> Committed: Tue Sep 26 15:17:32 2017 -0500 ---------------------------------------------------------------------- external/storm-blobstore-migration/Makefile | 19 +++ external/storm-blobstore-migration/README.md | 87 +++++++++++++ .../storm-blobstore-migration/config.sample | 8 ++ external/storm-blobstore-migration/listHDFS.sh | 11 ++ external/storm-blobstore-migration/listLocal.sh | 11 ++ external/storm-blobstore-migration/migrate.sh | 13 ++ external/storm-blobstore-migration/pom.xml | 126 +++++++++++++++++++ .../org/apache/storm/blobstore/ListHDFS.java | 52 ++++++++ .../org/apache/storm/blobstore/ListLocalFs.java | 42 +++++++ .../apache/storm/blobstore/MigrateBlobs.java | 125 ++++++++++++++++++ .../apache/storm/blobstore/MigratorMain.java | 42 +++++++ .../org/apache/storm/blobstore/AppTest.java | 38 ++++++ pom.xml | 1 + 13 files changed, 575 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/Makefile ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/Makefile b/external/storm-blobstore-migration/Makefile new file mode 100644 index 0000000..891318c --- /dev/null +++ b/external/storm-blobstore-migration/Makefile @@ -0,0 +1,19 @@ +PACKAGE_NAME=blobstore-migrator.tgz + +VERSION=$(shell cat VERSION || mvn help:evaluate -Dexpression=project.version | grep -v '^\[') + +all: $(PACKAGE_NAME) + +$(PACKAGE_NAME) : VERSION target/blobstore-migrator-$(VERSION).jar + -@rm -Rf blobstore-migrator $(PACKAGE_NAME) + mkdir blobstore-migrator + cp target/blobstore-migrator-$(VERSION).jar blobstore-migrator/ + cp listHDFS.sh listLocal.sh migrate.sh VERSION blobstore-migrator/ + tar -cvzf $(PACKAGE_NAME) blobstore-migrator +# rm -Rf blobstore-migrator + +target/blobstore-migrator-$(VERSION).jar : + mvn clean install + +VERSION : + echo $(VERSION) >VERSION http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/README.md ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/README.md b/external/storm-blobstore-migration/README.md new file mode 100644 index 0000000..894b9d6 --- /dev/null +++ b/external/storm-blobstore-migration/README.md @@ -0,0 +1,87 @@ +# Blobstore Migrator + +## Basic Use +----- + +### Build The Thing +Use make to build a tarball with everything needed. +``` +$ make +``` + +### Use The Thing +Copy and extract the tarball +``` +$ scp blobstore-migrator.tgz my-nimbus-host.example.com:~/ +$ ssh my-nimbus-host.example.com +... On my-nimbus-host ... +$ tar -xvzf blobstore-migrator.tgz +``` + +This will expand into a blobstore-migrator directory with all the scripts and the jar. +``` +$ cd blobstore-migrator +$ ls +blobstore-migrator-2.0.jar listHDFS.sh listLocal.sh migrate.sh +``` + +To run, first create a config for the cluster. +The config must be named 'config' +It must contain definitions for `HDFS_BLOBSTORE_DIR`, `LOCAL_BLOBSTORE_DIR`, and `HADOOP_CLASSPATH`. +Hadoop jars are packaged with neither storm nor this package, so they must be installed separately. + +Optional configs used to configure security are: `BLOBSTORE_PRINCIPAL`, `KEYTAB_FILE`, and `JAAS_CONF` + +Example: +``` +$ cat config +HDFS_BLOBSTORE_DIR='hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' +LOCAL_BLOBSTORE_DIR='/srv/storm' +HADOOP_CLASSPATH='/hadoop/share/hdfs/*:/hadoop/common/*' + +# My security configs: +BLOBSTORE_PRINCIPAL='stormUser/[email protected]' +KEYTAB_FILE='/srv/my-keytab/stormUser.kt' +JAAS_CONF='/storm/conf/storm_jaas.conf' +``` + +Now you can run any of the scripts, all of which require config to exist: + - listHDFS.sh: lists all blobs currently in the HDFS Blobstore + - listLocal.sh: lists all blobs currently in the local Blobstore + - migrate.sh: Begins the migration process for Nimbus. (Read instructions below first) + + +#### Migrating +##### Nimbus +To migrate blobs from nimbus, the following steps are necessary: + +1. Shut down Nimbus +2. Backup storm config +3. Change the following settings in Nimbus' storm config: + * blobstore.dir + * blobstore.hdfs.principal + * blobstore.hdfs.keytab + * blobstore.replication.factor + * nimbus.blobstore.class +4. Configure server so that the environment variable `STORM_EXT_CLASSPATH` includes whatever `HADOOP_CLASSPATH` contains when `storm nimbus` is run. +5. Run the migrate.sh script. It will migrate the blobs from the LocalFsBlobStore to the HdfsBlobStore, and then exit. +6. Double check to make sure the storm configs look sane, and the blobs are where they should be. (listHDFS.sh, listLocal.sh) + +Once everything looks good, start Nimbus and the Nimbus BlobStore migration will be done. + +If something goes wrong during this process, restore the config that you backed up in step 1 and then start Nimbus. Nimbus will use the Local Blobstore as before. + +##### Supervisors +Supervisors can be upgraded by performing the following steps: +1. Shut down the supervisor. +2. Putting the following blobstore settings in place: + * blobstore.dir + * blobstore.hdfs.principal + * blobstore.hdfs.keytab + * blobstore.replication.factor + * supervisor.blobstore.class +3. Kill all remaining worker processes (this is ugly) +4. Wipe the local state +5. Start the supervisor. + +The reason for the hard wipe of the supervisor state is due to spurious errors during supervisor migration that were only solved by wiping out the local state. This may not be the best solution, but it does seem to work predictably. http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/config.sample ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/config.sample b/external/storm-blobstore-migration/config.sample new file mode 100644 index 0000000..b4590ff --- /dev/null +++ b/external/storm-blobstore-migration/config.sample @@ -0,0 +1,8 @@ +HDFS_BLOBSTORE_DIR='hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' +LOCAL_BLOBSTORE_DIR='/srv/storm' +HADOOP_CLASSPATH='/hadoop-2.6.5/etc/hadoop/:/hadoop-2.6.5/share/hadoop/common/lib/*:/hadoop-2.6.5/share/hadoop/common/*:/hadoop-2.6.5/share/hadoop/hdfs:/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/hadoop-2.6.5/share/hadoop/hdfs/*:/hadoop-2.6.5/share/hadoop/yarn/lib/*:/hadoop-2.6.5/share/hadoop/yarn/*:/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/hadoop-2.6.5/share/hadoop/mapreduce/*' + +## Optional security configs +BLOBSTORE_PRINCIPAL='stormUser/[email protected]' +KEYTAB_FILE='/srv/my-keytab/stormUser.kt' +JAAS_CONF='/storm/conf/storm_jaas.conf' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/listHDFS.sh ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/listHDFS.sh b/external/storm-blobstore-migration/listHDFS.sh new file mode 100755 index 0000000..42c82e6 --- /dev/null +++ b/external/storm-blobstore-migration/listHDFS.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +. config +VERSION=`cat VERSION` +MIGRATION_JAR=blobstore-migrator-${VERSION}.jar + +if [ -n "$JAAS_CONF" ]; then + java -Djava.security.auth.login.config=$JAAS_CONF -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain listHDFS $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE; +else + java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain listHDFS $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE; +fi http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/listLocal.sh ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/listLocal.sh b/external/storm-blobstore-migration/listLocal.sh new file mode 100755 index 0000000..db6cc33 --- /dev/null +++ b/external/storm-blobstore-migration/listLocal.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +. config +VERSION=`cat VERSION` +MIGRATION_JAR=blobstore-migrator-${VERSION}.jar + +if [ -n "$JAAS_CONF" ]; then + java -Djava.security.auth.login.config=$JAAS_CONF -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain listLocalFs $LOCAL_BLOBSTORE_DIR +else + java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain listLocalFs $LOCAL_BLOBSTORE_DIR +fi http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/migrate.sh ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/migrate.sh b/external/storm-blobstore-migration/migrate.sh new file mode 100755 index 0000000..383d05b --- /dev/null +++ b/external/storm-blobstore-migration/migrate.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +. config +VERSION=`cat VERSION` +MIGRATION_JAR=blobstore-migrator-${VERSION}.jar + +if [ -n "$JAAS_CONF" ]; then + java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain migrate $LOCAL_BLOBSTORE_DIR $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE +else + java -Djava.security.auth.login.config=$JAAS_CONF -cp $HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain migrate $LOCAL_BLOBSTORE_DIR $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE +fi + +echo "Double check everything is correct, then start nimbus." http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/pom.xml b/external/storm-blobstore-migration/pom.xml new file mode 100644 index 0000000..e926045 --- /dev/null +++ b/external/storm-blobstore-migration/pom.xml @@ -0,0 +1,126 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>blobstore-migrator</artifactId> + <packaging>jar</packaging> + + <name>blobstore-migrator</name> + <url>http://maven.apache.org</url> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-server</artifactId> + <version>${project.version}</version> + <exclusions> + <!--log4j-over-slf4j must be excluded for hadoop-minicluster + see: http://stackoverflow.com/q/20469026/3542091 --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-hdfs</artifactId> + <version>${project.version}</version> + <exclusions> + <!--log4j-over-slf4j must be excluded for hadoop-minicluster + see: http://stackoverflow.com/q/20469026/3542091 --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- <dependency> --> + <!-- <artifactId>storm-core</artifactId> --> + <!-- <groupId>org.apache.storm</groupId> --> + <!-- <version>0.10.2.y</version> --> + <!-- </dependency> --> + <dependency> + <artifactId>hadoop-hdfs</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>${hdfs.version}</version> + </dependency> + <dependency> + <artifactId>hadoop-client</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <artifactId>hadoop-common</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>yahoo.yinst.storm_hadoop_client_conf</groupId> + <artifactId>storm_hadoop_client_conf</artifactId> + <version>1.0.0.4</version> + </dependency> + </dependencies> + <repositories> + <repository> + <id>central-ymaven</id> + <url>http://ymaven.corp.yahoo.com:9999/proximity/repository/public</url> + <snapshots> + <enabled>true</enabled> + </snapshots> + <releases> + <enabled>true</enabled> + </releases> + </repository> + </repositories> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.storm.blobstore.MigratorMain</mainClass> + </manifest> + </archive> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.storm.blobstore.MigratorMain</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java new file mode 100644 index 0000000..cdf76d0 --- /dev/null +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java @@ -0,0 +1,52 @@ +package org.apache.storm.blobstore; + +import java.util.Map; + +import javax.security.auth.Subject; + +import org.apache.storm.Config; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.hdfs.blobstore.HdfsBlobStore; +import org.apache.storm.hdfs.blobstore.HdfsClientBlobStore; +import org.apache.storm.utils.Utils; + +public class ListHDFS { + + public static void main(String[] args) throws Exception { + if(args.length < 1) { + System.out.println("Need at least 1 argument (hdfs_blobstore_path), but have " + Integer.toString(args.length)); + System.out.println("listHDFS <hdfs_blobstore_path> <hdfs_principal> <keytab>"); + System.out.println("Lists blobs in HdfsBlobStore"); + System.out.println("Example: listHDFS 'hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' 'stormUser/[email protected]' '/srv/my-keytab/stormUser.kt'"); + System.exit(1); + } + + Map<String, Object> hdfsConf = Utils.readStormConfig(); + String hdfsBlobstorePath = args[0]; + + hdfsConf.put(Config.BLOBSTORE_DIR, hdfsBlobstorePath); + hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + if(args.length >= 2) { + System.out.println("SETTING HDFS PRINCIPAL!"); + hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[1]); + } + if(args.length >= 3) { + System.out.println("SETTING HDFS KEYTAB!"); + hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[2]); + } + + /* CREATE THE BLOBSTORES */ + System.out.println("Creating HDFS blobstore."); + HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(); + hdfsBlobStore.prepare(hdfsConf, null, null); + System.out.println("Done."); + + /* LOOK AT HDFS BLOBSTORE */ + System.out.println("Listing HDFS blobstore keys."); + MigratorMain.listBlobStoreKeys(hdfsBlobStore, null); + System.out.println("Done."); + + hdfsBlobStore.shutdown(); + System.out.println("Done."); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java new file mode 100644 index 0000000..9a37c12 --- /dev/null +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java @@ -0,0 +1,42 @@ +package org.apache.storm.blobstore; + +import java.util.Map; + +import javax.security.auth.Subject; + +import org.apache.storm.Config; +import org.apache.storm.blobstore.LocalFsBlobStore; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; + +public class ListLocalFs { + + public static void main(String[] args) throws Exception { + + if(args.length != 1) { + System.out.println("Need 1 arguments, but have " + Integer.toString(args.length)); + System.out.println("listLocalFs <local_blobstore_dir>"); + System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore"); + System.out.println("Example: listLocalFs '/srv/storm'"); + System.exit(1); + } + + Map<String, Object> lfsConf = Utils.readStormConfig(); + lfsConf.put(Config.BLOBSTORE_DIR, args[0]); + lfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + + /* CREATE THE BLOBSTORE */ + System.out.println("Creating Local Blobstore."); + LocalFsBlobStore lfsBlobStore = new LocalFsBlobStore(); + lfsBlobStore.prepare(lfsConf, null, NimbusInfo.fromConf(lfsConf)); + System.out.println("Done."); + + /* LOOK AT HDFS BLOBSTORE */ + System.out.println("Listing Local blobstore keys."); + MigratorMain.listBlobStoreKeys(lfsBlobStore, null); + System.out.println("Done."); + + lfsBlobStore.shutdown(); + System.out.println("Done."); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java new file mode 100644 index 0000000..92ddd29 --- /dev/null +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java @@ -0,0 +1,125 @@ +package org.apache.storm.blobstore; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; + +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.hdfs.blobstore.HdfsBlobStore; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; +import org.apache.storm.blobstore.LocalFsBlobStore; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyAlreadyExistsException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.ReadableBlobMeta; +import org.apache.storm.generated.SettableBlobMeta; + +public class MigrateBlobs { + + protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) throws AuthorizationException, KeyNotFoundException { + Iterable<String> hdfsKeys = () -> bs.listKeys(); + for(String key : hdfsKeys) { + System.out.println(key); + bs.deleteBlob(key, who); + } + } + + protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, BlobStore bsTo, Subject whoTo) throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { + Iterable<String> lfsKeys = () -> bsFrom.listKeys(); + for(String key : lfsKeys) { + ReadableBlobMeta readable_meta = bsFrom.getBlobMeta(key, whoFrom); + SettableBlobMeta meta = readable_meta.get_settable(); + InputStream in = bsFrom.getBlob(key, whoFrom); + System.out.println("COPYING BLOB " + key + " FROM " + bsFrom + " TO " + bsTo); + bsTo.createBlob(key, in, meta, whoTo); + System.out.println("DONE CREATING BLOB " + key); + } + } + + + public static void main(String[] args) throws Exception { + // TODO Auto-generated method stub + Map<String, Object> hdfsConf = Utils.readStormConfig(); + + if (args.length < 2) { + System.out.println("Need at least 2 arguments, but have " + Integer.toString(args.length)); + System.out.println("migrate <local_blobstore_dir> <hdfs_blobstore_path> <hdfs_principal> <keytab>"); + System.out.println("Migrates blobs from LocalFsBlobStore to HdfsBlobStore"); + System.out.println("Example: migrate '/srv/storm' 'hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' 'stormUser/[email protected]' '/srv/my-keytab/stormUser.kt'"); + System.exit(1); + } + + String localBlobstoreDir = args[0]; + String hdfsBlobstorePath = args[1]; + + hdfsConf.put(Config.BLOBSTORE_DIR, hdfsBlobstorePath); + hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + if(args.length >= 3) { + System.out.println("SETTING HDFS PRINCIPAL!"); + hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[2]); + } + if(args.length >= 4) { + System.out.println("SETTING HDFS KEYTAB!"); + hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[3]); + } + hdfsConf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 7); + + Map<String, Object> lfsConf = Utils.readStormConfig(); + lfsConf.put(Config.BLOBSTORE_DIR, localBlobstoreDir); + lfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal"); + + + /* CREATE THE BLOBSTORES */ + System.out.println("Creating Local Blobstore."); + LocalFsBlobStore lfsBlobStore = new LocalFsBlobStore(); + lfsBlobStore.prepare(lfsConf, null, NimbusInfo.fromConf(lfsConf)); + System.out.print("Done."); + + System.out.println("Creating HDFS blobstore."); + HdfsBlobStore hdfsBlobStore = new HdfsBlobStore(); + hdfsBlobStore.prepare(hdfsConf, null, null); + System.out.print("Done."); + + + /* LOOK AT LOCAL BLOBSTORE */ + System.out.println("Listing local blobstore keys."); + MigratorMain.listBlobStoreKeys(lfsBlobStore, null); + System.out.println("Done listing local blobstore keys."); + + /* LOOK AT HDFS BLOBSTORE */ + System.out.println("Listing HDFS blobstore keys."); + MigratorMain.listBlobStoreKeys(hdfsBlobStore, null); + System.out.println("Done listing HDFS blobstore keys."); + + + System.out.println("Going to delete everything in HDFS, then copy all local blobs to HDFS. Continue? [Y/n]"); + String resp = System.console().readLine().toLowerCase().trim(); + if (!(resp.equals("y") || resp.equals(""))) { + System.out.println("Not copying blobs. Exiting. [" + resp.toLowerCase().trim() + "]"); + System.exit(1); + } + + /* DELETE EVERYTHING IN HDFS */ + System.out.println("Deleting blobs from HDFS."); + deleteAllBlobStoreKeys(hdfsBlobStore, null); + System.out.println("DONE deleting blobs from HDFS."); + + /* COPY EVERYTHING FROM LOCAL BLOBSTORE TO HDFS */ + System.out.println("Copying local blobstore keys."); + copyBlobStoreKeys(lfsBlobStore, null, hdfsBlobStore, null); + System.out.println("DONE Copying local blobstore keys."); + + /* LOOK AT HDFS BLOBSTORE AGAIN */ + System.out.println("Listing HDFS blobstore keys."); + MigratorMain.listBlobStoreKeys(hdfsBlobStore, null); + System.out.println("Done listing HDFS blobstore keys."); + + hdfsBlobStore.shutdown(); + System.out.println("Done."); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java new file mode 100644 index 0000000..5f8e3fd --- /dev/null +++ b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java @@ -0,0 +1,42 @@ +package org.apache.storm.blobstore; + +import java.util.Arrays; + +import javax.security.auth.Subject; + +public class MigratorMain { + + public static void listBlobStoreKeys(BlobStore bs, Subject who) { + Iterable<String> bsKeys = () -> bs.listKeys(); + for(String key : bsKeys) { + System.out.println(key); + } + } + + private static void usage() { + System.out.println("Commands:"); + System.out.println("\tlistHDFS"); + System.out.println("\tlistLocalFs"); + System.out.println("\tmigrate"); + } + + public static void main(String[] args) throws Exception { + if(args.length == 0) { + usage(); + } + + if(args[0].equals("listHDFS")) { + ListHDFS.main(Arrays.copyOfRange(args, 1, args.length)); + } + else if(args[0].equals("listLocalFs")) { + ListLocalFs.main(Arrays.copyOfRange(args, 1, args.length)); + } + else if(args[0].equals("migrate")) { + MigrateBlobs.main(Arrays.copyOfRange(args, 1, args.length)); + } + else { + System.out.println("Not recognized: " + args[0]); + usage(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java ---------------------------------------------------------------------- diff --git a/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java b/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java new file mode 100644 index 0000000..c4652bc --- /dev/null +++ b/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java @@ -0,0 +1,38 @@ +package org.apache.storm.blobstore; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9b523e6..89dd843 100644 --- a/pom.xml +++ b/pom.xml @@ -366,6 +366,7 @@ <module>external/storm-jms</module> <module>external/storm-pmml</module> <module>external/storm-rocketmq</module> + <module>external/storm-blobstore-migration</module> <module>integration-test</module> <!-- examples -->
