Repository: storm
Updated Branches:
  refs/heads/master 177cb95f4 -> a4fc110f7


Adding storm blobstore migrator.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4af7da0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4af7da0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4af7da0f

Branch: refs/heads/master
Commit: 4af7da0ff48bd347aa49509744c0e6a8e749341f
Parents: da2f035
Author: Kyle Nusbaum <[email protected]>
Authored: Tue Sep 26 15:17:32 2017 -0500
Committer: Kyle Nusbaum <[email protected]>
Committed: Tue Sep 26 15:17:32 2017 -0500

----------------------------------------------------------------------
 external/storm-blobstore-migration/Makefile     |  19 +++
 external/storm-blobstore-migration/README.md    |  87 +++++++++++++
 .../storm-blobstore-migration/config.sample     |   8 ++
 external/storm-blobstore-migration/listHDFS.sh  |  11 ++
 external/storm-blobstore-migration/listLocal.sh |  11 ++
 external/storm-blobstore-migration/migrate.sh   |  13 ++
 external/storm-blobstore-migration/pom.xml      | 126 +++++++++++++++++++
 .../org/apache/storm/blobstore/ListHDFS.java    |  52 ++++++++
 .../org/apache/storm/blobstore/ListLocalFs.java |  42 +++++++
 .../apache/storm/blobstore/MigrateBlobs.java    | 125 ++++++++++++++++++
 .../apache/storm/blobstore/MigratorMain.java    |  42 +++++++
 .../org/apache/storm/blobstore/AppTest.java     |  38 ++++++
 pom.xml                                         |   1 +
 13 files changed, 575 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/Makefile
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/Makefile 
b/external/storm-blobstore-migration/Makefile
new file mode 100644
index 0000000..891318c
--- /dev/null
+++ b/external/storm-blobstore-migration/Makefile
@@ -0,0 +1,19 @@
+PACKAGE_NAME=blobstore-migrator.tgz
+
+VERSION=$(shell cat VERSION || mvn help:evaluate -Dexpression=project.version 
| grep -v '^\[')
+
+all: $(PACKAGE_NAME)
+
+$(PACKAGE_NAME) : VERSION target/blobstore-migrator-$(VERSION).jar
+       -@rm -Rf blobstore-migrator $(PACKAGE_NAME)
+       mkdir blobstore-migrator
+       cp target/blobstore-migrator-$(VERSION).jar blobstore-migrator/
+       cp listHDFS.sh listLocal.sh migrate.sh VERSION blobstore-migrator/
+       tar -cvzf $(PACKAGE_NAME) blobstore-migrator
+#      rm -Rf blobstore-migrator
+
+target/blobstore-migrator-$(VERSION).jar :
+       mvn clean install
+
+VERSION :
+       echo $(VERSION) >VERSION

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/README.md
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/README.md 
b/external/storm-blobstore-migration/README.md
new file mode 100644
index 0000000..894b9d6
--- /dev/null
+++ b/external/storm-blobstore-migration/README.md
@@ -0,0 +1,87 @@
+# Blobstore Migrator
+
+## Basic Use
+-----
+
+### Build The Thing
+Use make to build a tarball with everything needed.
+```
+$ make
+```
+
+### Use The Thing
+Copy and extract the tarball
+```
+$ scp blobstore-migrator.tgz my-nimbus-host.example.com:~/
+$ ssh my-nimbus-host.example.com
+... On my-nimbus-host ...
+$ tar -xvzf blobstore-migrator.tgz
+```
+
+This will expand into a blobstore-migrator directory with all the scripts and 
the jar.
+```
+$ cd blobstore-migrator
+$ ls
+blobstore-migrator-2.0.jar listHDFS.sh  listLocal.sh  migrate.sh
+```
+
+To run, first create a config for the cluster.
+The config must be named 'config'
+It must contain definitions for `HDFS_BLOBSTORE_DIR`, `LOCAL_BLOBSTORE_DIR`, 
and `HADOOP_CLASSPATH`.
+Hadoop jars are packaged with neither storm nor this package, so they must be 
installed separately.
+
+Optional configs used to configure security are: `BLOBSTORE_PRINCIPAL`, 
`KEYTAB_FILE`, and `JAAS_CONF`
+
+Example:
+```
+$ cat config
+HDFS_BLOBSTORE_DIR='hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore'
+LOCAL_BLOBSTORE_DIR='/srv/storm'
+HADOOP_CLASSPATH='/hadoop/share/hdfs/*:/hadoop/common/*'
+
+# My security configs: 
+BLOBSTORE_PRINCIPAL='stormUser/[email protected]'
+KEYTAB_FILE='/srv/my-keytab/stormUser.kt'
+JAAS_CONF='/storm/conf/storm_jaas.conf'
+```
+
+Now you can run any of the scripts, all of which require config to exist:
+ - listHDFS.sh: lists all blobs currently in the HDFS Blobstore
+ - listLocal.sh: lists all blobs currently in the local Blobstore
+ - migrate.sh: Begins the migration process for Nimbus. (Read instructions 
below first)
+ 
+ 
+#### Migrating
+##### Nimbus
+To migrate blobs from nimbus, the following steps are necessary:
+
+1. Shut down Nimbus
+2. Backup storm config
+3. Change the following settings in Nimbus' storm config:
+   * blobstore.dir
+   * blobstore.hdfs.principal
+   * blobstore.hdfs.keytab
+   * blobstore.replication.factor
+   * nimbus.blobstore.class
+4. Configure server so that the environment variable `STORM_EXT_CLASSPATH` 
includes whatever `HADOOP_CLASSPATH` contains when `storm nimbus` is run.
+5. Run the migrate.sh script. It will migrate the blobs from the 
LocalFsBlobStore to the HdfsBlobStore, and then exit.
+6. Double check to make sure the storm configs look sane, and the blobs are 
where they should be. (listHDFS.sh, listLocal.sh)
+
+Once everything looks good, start Nimbus and the Nimbus BlobStore migration 
will be done.
+ 
+If something goes wrong during this process, restore the config that you 
backed up in step 1 and then start Nimbus. Nimbus will use the Local Blobstore 
as before.
+
+##### Supervisors
+Supervisors can be upgraded by performing the following steps:
+1. Shut down the supervisor.
+2. Putting the following blobstore settings in place:
+   * blobstore.dir
+   * blobstore.hdfs.principal
+   * blobstore.hdfs.keytab
+   * blobstore.replication.factor
+   * supervisor.blobstore.class
+3. Kill all remaining worker processes (this is ugly)
+4. Wipe the local state
+5. Start the supervisor.
+
+The reason for the hard wipe of the supervisor state is due to spurious errors 
during supervisor migration that were only solved by wiping out the local 
state. This may not be the best solution, but it does seem to work predictably.

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/config.sample
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/config.sample 
b/external/storm-blobstore-migration/config.sample
new file mode 100644
index 0000000..b4590ff
--- /dev/null
+++ b/external/storm-blobstore-migration/config.sample
@@ -0,0 +1,8 @@
+HDFS_BLOBSTORE_DIR='hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore'
+LOCAL_BLOBSTORE_DIR='/srv/storm'
+HADOOP_CLASSPATH='/hadoop-2.6.5/etc/hadoop/:/hadoop-2.6.5/share/hadoop/common/lib/*:/hadoop-2.6.5/share/hadoop/common/*:/hadoop-2.6.5/share/hadoop/hdfs:/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/hadoop-2.6.5/share/hadoop/hdfs/*:/hadoop-2.6.5/share/hadoop/yarn/lib/*:/hadoop-2.6.5/share/hadoop/yarn/*:/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/hadoop-2.6.5/share/hadoop/mapreduce/*'
+
+## Optional security configs
+BLOBSTORE_PRINCIPAL='stormUser/[email protected]'
+KEYTAB_FILE='/srv/my-keytab/stormUser.kt'
+JAAS_CONF='/storm/conf/storm_jaas.conf'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/listHDFS.sh
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/listHDFS.sh 
b/external/storm-blobstore-migration/listHDFS.sh
new file mode 100755
index 0000000..42c82e6
--- /dev/null
+++ b/external/storm-blobstore-migration/listHDFS.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+. config
+VERSION=`cat VERSION`
+MIGRATION_JAR=blobstore-migrator-${VERSION}.jar
+
+if [ -n "$JAAS_CONF" ]; then
+    java -Djava.security.auth.login.config=$JAAS_CONF -cp 
$HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain 
listHDFS $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE;
+else
+    java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR 
org.apache.storm.blobstore.MigratorMain listHDFS $HDFS_BLOBSTORE_DIR 
$BLOBSTORE_PRINCIPAL $KEYTAB_FILE;
+fi

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/listLocal.sh
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/listLocal.sh 
b/external/storm-blobstore-migration/listLocal.sh
new file mode 100755
index 0000000..db6cc33
--- /dev/null
+++ b/external/storm-blobstore-migration/listLocal.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+. config
+VERSION=`cat VERSION`
+MIGRATION_JAR=blobstore-migrator-${VERSION}.jar
+
+if [ -n "$JAAS_CONF" ]; then
+    java -Djava.security.auth.login.config=$JAAS_CONF -cp 
$HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain 
listLocalFs $LOCAL_BLOBSTORE_DIR
+else
+    java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR 
org.apache.storm.blobstore.MigratorMain listLocalFs $LOCAL_BLOBSTORE_DIR
+fi

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/migrate.sh
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/migrate.sh 
b/external/storm-blobstore-migration/migrate.sh
new file mode 100755
index 0000000..383d05b
--- /dev/null
+++ b/external/storm-blobstore-migration/migrate.sh
@@ -0,0 +1,13 @@
+#!/usr/bin/env bash
+
+. config
+VERSION=`cat VERSION`
+MIGRATION_JAR=blobstore-migrator-${VERSION}.jar
+
+if [ -n "$JAAS_CONF" ]; then
+    java -cp $HADOOP_CLASSPATH:$MIGRATION_JAR 
org.apache.storm.blobstore.MigratorMain migrate $LOCAL_BLOBSTORE_DIR 
$HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL $KEYTAB_FILE
+else
+    java -Djava.security.auth.login.config=$JAAS_CONF -cp 
$HADOOP_CLASSPATH:$MIGRATION_JAR org.apache.storm.blobstore.MigratorMain 
migrate $LOCAL_BLOBSTORE_DIR $HDFS_BLOBSTORE_DIR $BLOBSTORE_PRINCIPAL 
$KEYTAB_FILE
+fi
+
+echo "Double check everything is correct, then start nimbus."

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-blobstore-migration/pom.xml 
b/external/storm-blobstore-migration/pom.xml
new file mode 100644
index 0000000..e926045
--- /dev/null
+++ b/external/storm-blobstore-migration/pom.xml
@@ -0,0 +1,126 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    
+    <artifactId>blobstore-migrator</artifactId>
+    <packaging>jar</packaging>
+    
+    <name>blobstore-migrator</name>
+    <url>http://maven.apache.org</url>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-server</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hdfs</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- <dependency> -->
+        <!--     <artifactId>storm-core</artifactId> -->
+        <!--     <groupId>org.apache.storm</groupId> -->
+        <!--     <version>0.10.2.y</version> -->
+        <!-- </dependency> -->
+        <dependency>
+            <artifactId>hadoop-hdfs</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <version>${hdfs.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>hadoop-client</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <artifactId>hadoop-common</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>yahoo.yinst.storm_hadoop_client_conf</groupId>
+            <artifactId>storm_hadoop_client_conf</artifactId>
+            <version>1.0.0.4</version>
+        </dependency>
+    </dependencies>
+    <repositories>
+        <repository>
+            <id>central-ymaven</id>
+            
<url>http://ymaven.corp.yahoo.com:9999/proximity/repository/public</url>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+        </repository>
+    </repositories>
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <configuration>
+                                       <archive>
+                                               <manifest>
+                                                       
<mainClass>org.apache.storm.blobstore.MigratorMain</mainClass>
+                                               </manifest>
+                                       </archive>
+                               </configuration>
+                       </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.storm.blobstore.MigratorMain</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
new file mode 100644
index 0000000..cdf76d0
--- /dev/null
+++ 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListHDFS.java
@@ -0,0 +1,52 @@
+package org.apache.storm.blobstore;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.hdfs.blobstore.HdfsBlobStore;
+import org.apache.storm.hdfs.blobstore.HdfsClientBlobStore;
+import org.apache.storm.utils.Utils;
+
+public class ListHDFS {
+    
+    public static void main(String[] args) throws Exception {
+        if(args.length < 1) {
+            System.out.println("Need at least 1 argument 
(hdfs_blobstore_path), but have " + Integer.toString(args.length));
+            System.out.println("listHDFS <hdfs_blobstore_path> 
<hdfs_principal> <keytab>");
+            System.out.println("Lists blobs in HdfsBlobStore");
+            System.out.println("Example: listHDFS 
'hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' 
'stormUser/[email protected]' 
'/srv/my-keytab/stormUser.kt'");
+            System.exit(1);
+        }
+        
+        Map<String, Object> hdfsConf = Utils.readStormConfig();
+        String hdfsBlobstorePath = args[0];
+        
+        hdfsConf.put(Config.BLOBSTORE_DIR, hdfsBlobstorePath);
+        hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        if(args.length >= 2) {
+               System.out.println("SETTING HDFS PRINCIPAL!");
+               hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[1]);
+        }
+        if(args.length >= 3) {
+               System.out.println("SETTING HDFS KEYTAB!");
+               hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[2]);
+        }
+        
+        /* CREATE THE BLOBSTORES */
+        System.out.println("Creating HDFS blobstore.");
+        HdfsBlobStore hdfsBlobStore = new HdfsBlobStore();
+        hdfsBlobStore.prepare(hdfsConf, null, null);
+        System.out.println("Done.");
+        
+        /* LOOK AT HDFS BLOBSTORE */
+        System.out.println("Listing HDFS blobstore keys.");
+        MigratorMain.listBlobStoreKeys(hdfsBlobStore, null);
+        System.out.println("Done.");
+        
+        hdfsBlobStore.shutdown();
+        System.out.println("Done.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java
----------------------------------------------------------------------
diff --git 
a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java
 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java
new file mode 100644
index 0000000..9a37c12
--- /dev/null
+++ 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/ListLocalFs.java
@@ -0,0 +1,42 @@
+package org.apache.storm.blobstore;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+
+public class ListLocalFs {
+    
+    public static void main(String[] args) throws Exception {
+        
+        if(args.length != 1) {
+            System.out.println("Need 1 arguments, but have " + 
Integer.toString(args.length));
+            System.out.println("listLocalFs <local_blobstore_dir>");
+            System.out.println("Migrates blobs from LocalFsBlobStore to 
HdfsBlobStore");
+            System.out.println("Example: listLocalFs '/srv/storm'");
+            System.exit(1);
+        }
+                
+        Map<String, Object> lfsConf = Utils.readStormConfig();
+        lfsConf.put(Config.BLOBSTORE_DIR, args[0]);
+        lfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        
+        /* CREATE THE BLOBSTORE */
+        System.out.println("Creating Local Blobstore.");
+        LocalFsBlobStore lfsBlobStore = new LocalFsBlobStore();
+        lfsBlobStore.prepare(lfsConf, null, NimbusInfo.fromConf(lfsConf));
+        System.out.println("Done.");
+        
+        /* LOOK AT HDFS BLOBSTORE */
+        System.out.println("Listing Local blobstore keys.");
+        MigratorMain.listBlobStoreKeys(lfsBlobStore, null);
+        System.out.println("Done.");
+        
+        lfsBlobStore.shutdown();
+        System.out.println("Done.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
----------------------------------------------------------------------
diff --git 
a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
new file mode 100644
index 0000000..92ddd29
--- /dev/null
+++ 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigrateBlobs.java
@@ -0,0 +1,125 @@
+package org.apache.storm.blobstore;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.hdfs.blobstore.HdfsBlobStore;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+
+public class MigrateBlobs {
+    
+    protected static void deleteAllBlobStoreKeys(BlobStore bs, Subject who) 
throws AuthorizationException, KeyNotFoundException {
+        Iterable<String> hdfsKeys = () -> bs.listKeys();
+        for(String key : hdfsKeys) {
+            System.out.println(key);
+            bs.deleteBlob(key, who);
+        }
+    }
+    
+    protected static void copyBlobStoreKeys(BlobStore bsFrom, Subject whoFrom, 
BlobStore bsTo, Subject whoTo) throws AuthorizationException, 
KeyAlreadyExistsException, IOException, KeyNotFoundException {
+        Iterable<String> lfsKeys = () -> bsFrom.listKeys();
+        for(String key : lfsKeys) {
+            ReadableBlobMeta readable_meta = bsFrom.getBlobMeta(key, whoFrom);
+            SettableBlobMeta meta = readable_meta.get_settable();
+            InputStream in = bsFrom.getBlob(key, whoFrom);
+            System.out.println("COPYING BLOB " + key + " FROM " + bsFrom + " 
TO " + bsTo);
+            bsTo.createBlob(key, in, meta, whoTo);
+            System.out.println("DONE CREATING BLOB " + key);
+        }
+    }
+    
+    
+    public static void main(String[] args) throws Exception {
+        // TODO Auto-generated method stub
+        Map<String, Object> hdfsConf = Utils.readStormConfig();
+        
+        if (args.length < 2) {
+            System.out.println("Need at least 2 arguments, but have " + 
Integer.toString(args.length));
+            System.out.println("migrate <local_blobstore_dir> 
<hdfs_blobstore_path> <hdfs_principal> <keytab>");
+            System.out.println("Migrates blobs from LocalFsBlobStore to 
HdfsBlobStore");
+            System.out.println("Example: migrate '/srv/storm' 
'hdfs://some-hdfs-namenode:8080/srv/storm/my-storm-blobstore' 
'stormUser/[email protected]' 
'/srv/my-keytab/stormUser.kt'");
+            System.exit(1);
+        }
+        
+        String localBlobstoreDir = args[0];
+        String hdfsBlobstorePath = args[1];
+        
+        hdfsConf.put(Config.BLOBSTORE_DIR, hdfsBlobstorePath);
+        hdfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        if(args.length >= 3) {
+               System.out.println("SETTING HDFS PRINCIPAL!");
+               hdfsConf.put(Config.BLOBSTORE_HDFS_PRINCIPAL, args[2]);
+        }
+        if(args.length >= 4) {
+               System.out.println("SETTING HDFS KEYTAB!");
+               hdfsConf.put(Config.BLOBSTORE_HDFS_KEYTAB, args[3]);
+        }
+        hdfsConf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 7);
+        
+        Map<String, Object> lfsConf = Utils.readStormConfig();
+        lfsConf.put(Config.BLOBSTORE_DIR, localBlobstoreDir);
+        lfsConf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
"org.apache.storm.security.auth.DefaultPrincipalToLocal");
+        
+        
+        /* CREATE THE BLOBSTORES */
+        System.out.println("Creating Local Blobstore.");
+        LocalFsBlobStore lfsBlobStore = new LocalFsBlobStore();
+        lfsBlobStore.prepare(lfsConf, null, NimbusInfo.fromConf(lfsConf));
+        System.out.print("Done.");
+        
+        System.out.println("Creating HDFS blobstore.");
+        HdfsBlobStore hdfsBlobStore = new HdfsBlobStore();
+        hdfsBlobStore.prepare(hdfsConf, null, null);
+        System.out.print("Done.");
+        
+        
+        /* LOOK AT LOCAL BLOBSTORE */
+        System.out.println("Listing local blobstore keys.");
+        MigratorMain.listBlobStoreKeys(lfsBlobStore, null);
+        System.out.println("Done listing local blobstore keys.");
+        
+        /* LOOK AT HDFS BLOBSTORE */
+        System.out.println("Listing HDFS blobstore keys.");
+        MigratorMain.listBlobStoreKeys(hdfsBlobStore, null);
+        System.out.println("Done listing HDFS blobstore keys.");
+        
+        
+        System.out.println("Going to delete everything in HDFS, then copy all 
local blobs to HDFS. Continue? [Y/n]");
+        String resp = System.console().readLine().toLowerCase().trim();
+        if (!(resp.equals("y") || resp.equals(""))) {
+            System.out.println("Not copying blobs. Exiting. [" + 
resp.toLowerCase().trim() + "]");
+            System.exit(1);
+        }
+        
+        /* DELETE EVERYTHING IN HDFS */
+        System.out.println("Deleting blobs from HDFS.");
+        deleteAllBlobStoreKeys(hdfsBlobStore, null);
+        System.out.println("DONE deleting blobs from HDFS.");
+        
+        /* COPY EVERYTHING FROM LOCAL BLOBSTORE TO HDFS */
+        System.out.println("Copying local blobstore keys.");
+        copyBlobStoreKeys(lfsBlobStore, null, hdfsBlobStore, null);
+        System.out.println("DONE Copying local blobstore keys.");
+        
+        /* LOOK AT HDFS BLOBSTORE AGAIN */
+        System.out.println("Listing HDFS blobstore keys.");
+        MigratorMain.listBlobStoreKeys(hdfsBlobStore, null);
+        System.out.println("Done listing HDFS blobstore keys.");
+        
+        hdfsBlobStore.shutdown();
+        System.out.println("Done.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java
----------------------------------------------------------------------
diff --git 
a/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java
 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java
new file mode 100644
index 0000000..5f8e3fd
--- /dev/null
+++ 
b/external/storm-blobstore-migration/src/main/java/org/apache/storm/blobstore/MigratorMain.java
@@ -0,0 +1,42 @@
+package org.apache.storm.blobstore;
+
+import java.util.Arrays;
+
+import javax.security.auth.Subject;
+
+public class MigratorMain {
+       
+       public static void listBlobStoreKeys(BlobStore bs, Subject who) {
+        Iterable<String> bsKeys = () -> bs.listKeys();
+        for(String key : bsKeys) {
+            System.out.println(key);
+        }
+    }
+
+    private static void usage() {
+        System.out.println("Commands:");
+        System.out.println("\tlistHDFS");
+        System.out.println("\tlistLocalFs");
+        System.out.println("\tmigrate");
+    }
+    
+    public static void main(String[] args) throws Exception {
+        if(args.length == 0) {
+            usage();
+        }
+        
+        if(args[0].equals("listHDFS")) {
+            ListHDFS.main(Arrays.copyOfRange(args, 1, args.length));
+        }
+        else if(args[0].equals("listLocalFs")) {
+            ListLocalFs.main(Arrays.copyOfRange(args, 1, args.length));
+        }
+        else if(args[0].equals("migrate")) {
+            MigrateBlobs.main(Arrays.copyOfRange(args, 1, args.length));
+        }
+        else {
+            System.out.println("Not recognized: " + args[0]);
+            usage();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java
 
b/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java
new file mode 100644
index 0000000..c4652bc
--- /dev/null
+++ 
b/external/storm-blobstore-migration/src/test/java/org/apache/storm/blobstore/AppTest.java
@@ -0,0 +1,38 @@
+package org.apache.storm.blobstore;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest 
+    extends TestCase
+{
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public AppTest( String testName )
+    {
+        super( testName );
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite()
+    {
+        return new TestSuite( AppTest.class );
+    }
+
+    /**
+     * Rigourous Test :-)
+     */
+    public void testApp()
+    {
+        assertTrue( true );
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4af7da0f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9b523e6..89dd843 100644
--- a/pom.xml
+++ b/pom.xml
@@ -366,6 +366,7 @@
         <module>external/storm-jms</module>
         <module>external/storm-pmml</module>
         <module>external/storm-rocketmq</module>
+        <module>external/storm-blobstore-migration</module>
         <module>integration-test</module>
 
         <!-- examples -->

Reply via email to