This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 7a04fda Updated Fluo to build and run with Java 11 (#1078)
7a04fda is described below
commit 7a04fdac2382c2222daa5b9a27115738ff71a9d1
Author: Keith Turner <[email protected]>
AuthorDate: Mon Oct 21 16:14:17 2019 -0400
Updated Fluo to build and run with Java 11 (#1078)
Fluo was not building and running with Java 11. Twill was causing
problems with Java 11. Since twill was part of the built in support for
running Fluo in YARN that was deprecated in Fluo 1.2.0 it was removed
along with the deprected fluo-cluster module, related fluo script
commands, and deprecated fluo properties file. Also some deperecated
methods in the Fluo configuration API had to be dropped. Because of the
changes to the API and scripts, Fluo's version was bumped to
2.0.0-SNAPSHOT.
Another reason to drop Twill is that it does not seem to support Hadoop 3.
Fluo initialization was assuming the System classloader was a URL
classloader. This assumption is no longer true in Java 11. This code
was updated to use the java system property java.class.path.
Findbugs was not working with Java 11, so this was replaced with
spotbugs in the poms. Some changes were made to fix some issue found by
spotbugs.
* code review updates
---
.travis.yml | 2 +-
modules/accumulo/pom.xml | 2 +-
modules/api/pom.xml | 2 +-
.../apache/fluo/api/config/FluoConfiguration.java | 20 +-
.../java/org/apache/fluo/api/data/RowColumn.java | 2 +-
.../fluo/api/config/FluoConfigurationTest.java | 42 --
modules/cluster/pom.xml | 98 -----
.../apache/fluo/cluster/command/FluoCommand.java | 120 ------
.../fluo/cluster/runnable/OracleRunnable.java | 100 -----
.../fluo/cluster/runnable/WorkerRunnable.java | 114 ------
.../org/apache/fluo/cluster/runner/AppRunner.java | 241 -----------
.../fluo/cluster/runner/ClusterAppRunner.java | 191 ---------
.../apache/fluo/cluster/runner/YarnAppRunner.java | 446 ---------------------
.../org/apache/fluo/cluster/util/ClusterUtil.java | 47 ---
.../org/apache/fluo/cluster/util/FluoInstall.java | 151 -------
.../apache/fluo/cluster/util/FluoYarnConfig.java | 68 ----
.../org/apache/fluo/cluster/util/LogbackUtil.java | 76 ----
.../apache/fluo/cluster/util/ValidateAppName.java | 37 --
.../org/apache/fluo/cluster/yarn/FluoTwillApp.java | 129 ------
.../org/apache/fluo/cluster/yarn/TwillUtil.java | 44 --
modules/cluster/src/main/resources/log4j.xml | 38 --
modules/cluster/src/main/resources/logback.xml | 36 --
modules/command/pom.xml | 2 +-
.../java/org/apache/fluo/command/FluoInit.java | 3 +-
.../src/main/spotbugs}/exclude-filter.xml | 8 +-
modules/core/pom.xml | 2 +-
.../org/apache/fluo/core/client/FluoAdminImpl.java | 34 +-
.../fluo/core/impl/FluoConfigurationImpl.java | 2 +-
.../org/apache/fluo/core/oracle/OracleClient.java | 2 +-
.../org/apache/fluo/core/oracle/OracleServer.java | 5 +-
.../org/apache/fluo/core/thrift/OracleService.java | 2 -
.../java/org/apache/fluo/core/thrift/Stamps.java | 2 -
.../java/org/apache/fluo/core/util/HostUtil.java | 7 +-
.../fluo/core/worker/NotificationProcessor.java | 13 +
.../main/{findbugs => spotbugs}/exclude-filter.xml | 11 +-
modules/distribution/pom.xml | 7 +-
modules/distribution/src/main/assembly/bin.xml | 6 -
modules/distribution/src/main/config/fluo-env.sh | 13 +-
.../src/main/config/fluo.properties.deprecated | 169 --------
modules/distribution/src/main/config/logback.xml | 47 ---
modules/distribution/src/main/lib/fetch.sh | 26 --
modules/distribution/src/main/scripts/fluo | 134 ++-----
modules/integration-tests/pom.xml | 2 +-
.../java/org/apache/fluo/integration/ITBase.java | 4 +-
.../apache/fluo/integration/impl/FailureIT.java | 6 +-
.../org/apache/fluo/integration/impl/MiniIT.java | 2 +-
.../apache/fluo/integration/impl/TransactorIT.java | 6 +-
.../org/apache/fluo/integration/impl/WorkerIT.java | 8 +-
.../src/main/spotbugs}/exclude-filter.xml | 8 +-
modules/mapreduce/pom.xml | 2 +-
.../fluo/mapreduce/it/FluoFileOutputFormatIT.java | 4 +-
modules/mini/pom.xml | 2 +-
.../java/org/apache/fluo/mini/MiniFluoImpl.java | 4 +-
.../src/main/spotbugs}/exclude-filter.xml | 8 +-
pom.xml | 87 ++--
55 files changed, 174 insertions(+), 2470 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 98b2bd5..c5efe12 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -22,7 +22,7 @@ cache:
- $HOME/.m2
install: echo NOOP Skipping pre-fetch of Maven dependencies
jdk:
- - openjdk8
+ - openjdk11
before_script:
- unset _JAVA_OPTIONS
env:
diff --git a/modules/accumulo/pom.xml b/modules/accumulo/pom.xml
index e43759c..0788af1 100644
--- a/modules/accumulo/pom.xml
+++ b/modules/accumulo/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-accumulo</artifactId>
diff --git a/modules/api/pom.xml b/modules/api/pom.xml
index 2e1ad5d..fd35618 100644
--- a/modules/api/pom.xml
+++ b/modules/api/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-api</artifactId>
diff --git
a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
index a241128..3cf944b 100644
---
a/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
+++
b/modules/api/src/main/java/org/apache/fluo/api/config/FluoConfiguration.java
@@ -179,13 +179,6 @@ public class FluoConfiguration extends SimpleConfiguration
{
*/
@Deprecated
public static final String ADMIN_ACCUMULO_TABLE_PROP = ADMIN_PREFIX +
".accumulo.table";
- /**
- * @deprecated since 1.2.0 replaced by fluo.observer.init.dir and
fluo.observer.jars.url
- */
- @Deprecated
- public static final String ADMIN_ACCUMULO_CLASSPATH_PROP = ADMIN_PREFIX +
".accumulo.classpath";
- @Deprecated
- public static final String ADMIN_ACCUMULO_CLASSPATH_DEFAULT = "";
// Worker properties
private static final String WORKER_PREFIX = FLUO_PREFIX + ".worker";
@@ -552,17 +545,6 @@ public class FluoConfiguration extends SimpleConfiguration
{
return getDepNonEmptyString(ACCUMULO_TABLE_PROP,
ADMIN_ACCUMULO_TABLE_PROP);
}
- @Deprecated
- public FluoConfiguration setAccumuloClasspath(String path) {
- setProperty(ADMIN_ACCUMULO_CLASSPATH_PROP,
verifyNotNull(ADMIN_ACCUMULO_CLASSPATH_PROP, path));
- return this;
- }
-
- @Deprecated
- public String getAccumuloClasspath() {
- return getString(ADMIN_ACCUMULO_CLASSPATH_PROP,
ADMIN_ACCUMULO_CLASSPATH_DEFAULT);
- }
-
/**
* Sets paths to jars to provide to Accumulo. If not set, Fluo will find
jars on classpath
* <p>
@@ -1030,7 +1012,7 @@ public class FluoConfiguration extends
SimpleConfiguration {
public boolean hasRequiredAdminProps() {
boolean valid = true;
valid &= hasRequiredClientProps();
- valid &= verifyStringPropSet(ACCUMULO_TABLE_PROP,
ADMIN_ACCUMULO_TABLE_PROP);
+ valid &= verifyStringPropSet(ACCUMULO_TABLE_PROP);
return valid;
}
diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
index 7b2a8c2..53a7f69 100644
--- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
+++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java
@@ -27,7 +27,7 @@ import java.util.Objects;
public final class RowColumn implements Comparable<RowColumn>, Serializable {
private static final long serialVersionUID = 1L;
- public static RowColumn EMPTY = new RowColumn();
+ public static final RowColumn EMPTY = new RowColumn();
private Bytes row = Bytes.EMPTY;
private Column col = Column.EMPTY;
diff --git
a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
index c983bcc..d8c1b10 100644
---
a/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
+++
b/modules/api/src/test/java/org/apache/fluo/api/config/FluoConfigurationTest.java
@@ -49,11 +49,6 @@ public class FluoConfigurationTest {
base.getConnectionRetryTimeout());
Assert.assertEquals(FluoConfiguration.ACCUMULO_ZOOKEEPERS_DEFAULT,
base.getAccumuloZookeepers());
- @SuppressWarnings("deprecation")
- String tmpFieldName = FluoConfiguration.ADMIN_ACCUMULO_CLASSPATH_DEFAULT;
- @SuppressWarnings("deprecation")
- String tmpCP = base.getAccumuloClasspath();
- Assert.assertEquals(tmpFieldName, tmpCP);
Assert.assertEquals(FluoConfiguration.WORKER_NUM_THREADS_DEFAULT,
base.getWorkerThreads());
Assert.assertEquals(FluoConfiguration.TRANSACTION_ROLLBACK_TIME_DEFAULT,
base.getTransactionRollbackTime());
@@ -89,9 +84,6 @@ public class FluoConfigurationTest {
@Test
public void testSetGet() {
FluoConfiguration config = new FluoConfiguration();
- @SuppressWarnings("deprecation")
- String tmpCP =
config.setAccumuloClasspath("path1,path2").getAccumuloClasspath();
- Assert.assertEquals("path1,path2", tmpCP);
Assert.assertEquals("path1,path2",
config.setAccumuloJars("path1,path2").getAccumuloJars());
Assert.assertEquals("instance",
config.setAccumuloInstance("instance").getAccumuloInstance());
Assert.assertEquals("pass",
config.setAccumuloPassword("pass").getAccumuloPassword());
@@ -182,37 +174,6 @@ public class FluoConfigurationTest {
}
@Test
- public void testLoadingOldPropsFile() {
- File propsFile = new
File("../distribution/src/main/config/fluo.properties.deprecated");
- Assert.assertTrue(propsFile.exists());
-
- FluoConfiguration config = new FluoConfiguration(propsFile);
- // make sure classpath contains comma. otherwise it was shortened
- @SuppressWarnings("deprecation")
- String tmpCP = config.getAccumuloClasspath();
- Assert.assertTrue(tmpCP.contains(","));
- // check for values set in prop file
- Assert.assertEquals("localhost/fluo", config.getInstanceZookeepers());
- Assert.assertEquals("localhost", config.getAccumuloZookeepers());
- Assert.assertEquals("", config.getAccumuloPassword());
- try {
- config.getAccumuloUser();
- Assert.fail();
- } catch (IllegalArgumentException e) {
- }
- try {
- String act = config.getAccumuloTable();
- Assert.fail("Saw " + act);
- } catch (IllegalArgumentException e) {
- }
- try {
- config.getAccumuloInstance();
- Assert.fail();
- } catch (IllegalArgumentException e) {
- }
- }
-
- @Test
public void testLoadingDistPropsFile() {
File connectionProps = new
File("../distribution/src/main/config/fluo-conn.properties");
Assert.assertTrue(connectionProps.exists());
@@ -255,9 +216,6 @@ public class FluoConfigurationTest {
FluoConfiguration config = new FluoConfiguration(propsFile);
// make sure classpath contains comma. otherwise it was shortened
- @SuppressWarnings("deprecation")
- String tmpCP = config.getAccumuloClasspath();
- Assert.assertTrue(tmpCP.contains(","));
// check for values set in prop file
Assert.assertEquals("app1", config.getApplicationName());
Assert.assertEquals("localhost/fluo2", config.getInstanceZookeepers());
diff --git a/modules/cluster/pom.xml b/modules/cluster/pom.xml
deleted file mode 100644
index ad07ac1..0000000
--- a/modules/cluster/pom.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- agreements. See the NOTICE file distributed with this work for additional
information regarding
- copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- "License"); you may not use this file except in compliance with the License.
You may obtain a
- copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
distributed under the License
- is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- or implied. See the License for the specific language governing permissions
and limitations under
- the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <artifactId>fluo-cluster</artifactId>
- <name>Apache Fluo Cluster</name>
- <description>This module contains all code necessary to run Apache Fluo on a
YARN cluster using
- Apache Twill. It was separated from fluo-core to keep dependencies (like
Twill) out of Fluo
- clients which depend on the fluo-core jar. It was also done to limit
conflicts. For example,
- Twill requires logback to be used but fluo-core requires log4j (due to
zookeeper requiring it in
- accumulo-minicluster).</description>
- <dependencies>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.beust</groupId>
- <artifactId>jcommander</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>com.google.inject</groupId>
- <artifactId>guice</artifactId>
- <version>4.2.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
- <artifactId>fluo-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client-api</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-yarn</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
deleted file mode 100644
index 447257b..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/command/FluoCommand.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.command;
-
-import java.util.Arrays;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.cluster.runner.YarnAppRunner;
-import org.apache.fluo.cluster.util.FluoInstall;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implementation of Fluo command
- */
-@Deprecated
-public class FluoCommand {
-
- public static void verifyNoArgs(String[] remainArgs) {
- if (remainArgs.length != 0) {
- System.err.println(
- "ERROR - Received unexpected command-line arguments: " +
Arrays.toString(remainArgs));
- System.exit(-1);
- }
- }
-
- public static void main(String[] args) {
-
- if (args.length < 4) {
- System.err.println("ERROR - Expected at least two arguments. "
- + "Usage: FluoCommand <fluoHomeDir> <hadoopPrefix> <command>
<appName> ...");
- System.exit(-1);
- }
-
- String fluoHomeDir = args[0];
- String hadoopPrefix = args[1];
- String command = args[2];
- String appName = args[3];
- String[] remainArgs = Arrays.copyOfRange(args, 4, args.length);
-
- if (command.equalsIgnoreCase("scan")) {
- for (String logger : new String[] {Logger.ROOT_LOGGER_NAME,
"org.apache.fluo"}) {
- ((Logger) LoggerFactory.getLogger(logger)).setLevel(Level.ERROR);
- }
- }
-
- FluoInstall fluoInstall = new FluoInstall(fluoHomeDir);
-
- try (YarnAppRunner runner = new YarnAppRunner(hadoopPrefix)) {
- switch (command.toLowerCase()) {
- case "init":
- runner.init(fluoInstall.getAppConfiguration(appName),
- fluoInstall.getAppPropsPath(appName), remainArgs);
- break;
- case "list":
- verifyNoArgs(remainArgs);
- runner.list(fluoInstall.getFluoConfiguration());
- break;
- case "start":
- verifyNoArgs(remainArgs);
- runner.start(fluoInstall.getAppConfiguration(appName),
fluoInstall.getAppConfDir(appName),
- fluoInstall.getAppLibDir(appName), fluoInstall.getLibDir());
- break;
- case "scan":
- runner.scan(fluoInstall.resolveFluoConfiguration(appName),
remainArgs);
- break;
- case "stop":
- verifyNoArgs(remainArgs);
- runner.stop(fluoInstall.resolveFluoConfiguration(appName));
- break;
- case "kill":
- verifyNoArgs(remainArgs);
- runner.kill(fluoInstall.resolveFluoConfiguration(appName));
- break;
- case "status":
- verifyNoArgs(remainArgs);
- runner.status(fluoInstall.resolveFluoConfiguration(appName), false);
- break;
- case "info":
- verifyNoArgs(remainArgs);
- runner.status(fluoInstall.resolveFluoConfiguration(appName), true);
- break;
- case "wait":
- verifyNoArgs(remainArgs);
-
runner.waitUntilFinished(fluoInstall.resolveFluoConfiguration(appName));
- break;
- case "exec":
- runner.exec(fluoInstall.resolveFluoConfiguration(appName, false),
remainArgs);
- break;
- default:
- System.err.println("Unknown command: " + command);
- break;
- }
- } catch (FluoException e) {
- System.err.println("ERROR - " + e.getMessage());
- System.exit(-1);
- } catch (Exception e) {
- System.err.println("Command failed due to exception below:");
- e.printStackTrace();
- System.exit(-1);
- }
-
- // TODO FLUO-464 - Speed up exit and remove System.exit() below
- System.exit(0);
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
deleted file mode 100644
index 99d09a0..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/OracleRunnable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runnable;
-
-import java.io.File;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.service.FluoOracle;
-import org.apache.fluo.core.metrics.MetricNames;
-import org.apache.fluo.core.oracle.FluoOracleImpl;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Run method of Fluo oracle that is called within a Twill/YARN application
- */
-@Deprecated
-public class OracleRunnable extends AbstractTwillRunnable {
-
- private static final Logger log =
LoggerFactory.getLogger(OracleRunnable.class);
- public static String ORACLE_NAME = "FluoOracle";
- private AtomicBoolean shutdown = new AtomicBoolean(false);
- private static final String STDOUT = "STDOUT";
-
- @Override
- public void run() {
- System.out.println("Starting Oracle");
- String configDir = "./conf";
- String propsPath = configDir + "/fluo.properties";
- Objects.requireNonNull(propsPath);
- File propsFile = new File(propsPath);
- if (!propsFile.exists()) {
- System.err.println("ERROR - Fluo properties file does not exist: " +
propsPath);
- System.exit(-1);
- }
- String logDir = System.getenv("LOG_DIRS");
- if (logDir == null) {
- System.err
- .println("LOG_DIRS env variable was not set by Twill. Logging to
console instead!");
- logDir = STDOUT;
- }
-
- try {
- if (!logDir.equals(STDOUT)) {
- org.apache.fluo.cluster.util.LogbackUtil.init("oracle", configDir,
logDir);
- }
- } catch (Exception e) {
- System.err.println("Exception while starting FluoOracle: " +
e.getMessage());
- e.printStackTrace();
- System.exit(-1);
- }
-
- try {
- FluoConfiguration config = new FluoConfiguration(propsFile);
-
- TwillContext context = getContext();
- if (context != null &&
System.getProperty(MetricNames.METRICS_REPORTER_ID_PROP) == null) {
- System.setProperty(MetricNames.METRICS_REPORTER_ID_PROP,
- "oracle-" + context.getInstanceId());
- }
-
- // FluoFactory cannot be used to create FluoOracle as Twill will not
load its dependencies
- // if it is loaded dynamically
- FluoOracle oracle = new FluoOracleImpl(config);
- oracle.start();
- while (!shutdown.get()) {
- UtilWaitThread.sleep(10000);
- }
- oracle.stop();
- } catch (Exception e) {
- log.error("Exception running FluoOracle: ", e);
- }
-
- log.info("FluoOracle is exiting.");
- }
-
- @Override
- public void stop() {
- log.info("Stopping Fluo oracle");
- shutdown.set(true);
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
deleted file mode 100644
index dbfd191..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runnable/WorkerRunnable.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runnable;
-
-import java.io.File;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.service.FluoWorker;
-import org.apache.fluo.core.metrics.MetricNames;
-import org.apache.fluo.core.util.UtilWaitThread;
-import org.apache.fluo.core.worker.FluoWorkerImpl;
-import org.apache.twill.api.AbstractTwillRunnable;
-import org.apache.twill.api.TwillContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Run method of Fluo worker that is called within a Twill/YARN application
- */
-@Deprecated
-public class WorkerRunnable extends AbstractTwillRunnable {
-
- private static final Logger log =
LoggerFactory.getLogger(WorkerRunnable.class);
- public static String WORKER_NAME = "FluoWorker";
- private AtomicBoolean shutdown = new AtomicBoolean(false);
- private static final String STDOUT = "STDOUT";
-
- @Override
- public void run() {
- System.out.println("Starting Worker");
- String configDir = "./conf";
- String propsPath = configDir + "/fluo.properties";
- Objects.requireNonNull(propsPath);
- File propsFile = new File(propsPath);
- if (!propsFile.exists()) {
- System.err.println("ERROR - Fluo properties file does not exist: " +
propsPath);
- System.exit(-1);
- }
- String logDir = System.getenv("LOG_DIRS");
- if (logDir == null) {
- System.err
- .println("LOG_DIRS env variable was not set by Twill. Logging to
console instead!");
- logDir = STDOUT;
- }
-
- try {
- if (!logDir.equals(STDOUT)) {
- org.apache.fluo.cluster.util.LogbackUtil.init("worker", configDir,
logDir);
- }
- } catch (Exception e) {
- System.err.println("Exception while starting FluoWorker: " +
e.getMessage());
- e.printStackTrace();
- System.exit(-1);
- }
-
- try {
- FluoConfiguration config = new FluoConfiguration(propsFile);
- if (!config.hasRequiredWorkerProps()) {
- log.error("fluo.properties is missing required properties for worker");
- System.exit(-1);
- }
- // any client in worker should retry forever
- config.setConnectionRetryTimeout(-1);
-
- try {
- config.validate();
- } catch (Exception e) {
- System.err.println("Error - Invalid fluo.properties due to " +
e.getMessage());
- e.printStackTrace();
- System.exit(-1);
- }
-
- TwillContext context = getContext();
- if (context != null &&
System.getProperty(MetricNames.METRICS_REPORTER_ID_PROP) == null) {
- System.setProperty(MetricNames.METRICS_REPORTER_ID_PROP,
- "worker-" + context.getInstanceId());
- }
-
- // FluoFactory cannot be used to create FluoWorker as Twill will not
load its dependencies
- // if it is loaded dynamically
- FluoWorker worker = new FluoWorkerImpl(config);
- worker.start();
- while (!shutdown.get()) {
- UtilWaitThread.sleep(1000);
- }
- worker.stop();
- } catch (Exception e) {
- log.error("Exception running FluoWorker: ", e);
- }
-
- log.info("Worker is exiting.");
- }
-
- @Override
- public void stop() {
- log.info("Stopping Fluo worker");
- shutdown.set(true);
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
deleted file mode 100644
index 78d301b..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/AppRunner.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runner;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-
-import javax.inject.Provider;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.core.impl.Environment;
-import org.apache.fluo.core.impl.Notification;
-import org.apache.fluo.core.util.ScanUtil;
-import org.apache.fluo.core.util.ScanUtil.ScanFlags;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for running a Fluo application
- */
-@Deprecated
-public abstract class AppRunner {
-
- private static final Logger log = LoggerFactory.getLogger(AppRunner.class);
- private static final long MIN_SLEEP_SEC = 10;
- private static final long MAX_SLEEP_SEC = 300;
-
- private String scriptName;
-
- public AppRunner(String scriptName) {
- this.scriptName = scriptName;
- }
-
- public void scan(FluoConfiguration config, String[] args) {
- ScanOptions options = new ScanOptions();
- JCommander jcommand = new JCommander(options);
- jcommand.setProgramName(scriptName + " scan <app>");
- try {
- jcommand.parse(args);
- } catch (ParameterException e) {
- System.err.println(e.getMessage());
- jcommand.usage();
- System.exit(-1);
- }
-
- if (options.help) {
- jcommand.usage();
- System.exit(0);
- }
-
- try {
- if (options.scanAccumuloTable) {
- ScanUtil.scanAccumulo(options.getScanOpts(), config, System.out);
- } else {
- ScanUtil.scanFluo(options.getScanOpts(), config, System.out);
- }
- } catch (IOException e) {
- System.err.println(e.getMessage());
- System.exit(-1);
- }
- }
-
- private long calculateSleep(long notifyCount, long numWorkers) {
- long sleep = notifyCount / numWorkers / 100;
- if (sleep < MIN_SLEEP_SEC) {
- return MIN_SLEEP_SEC;
- } else if (sleep > MAX_SLEEP_SEC) {
- return MAX_SLEEP_SEC;
- }
- return sleep;
- }
-
- @VisibleForTesting
- public long countNotifications(Environment env) {
- Scanner scanner = null;
- try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
- } catch (TableNotFoundException e) {
- log.error("An exception was thrown -", e);
- throw new FluoException(e);
- }
-
- Notification.configureScanner(scanner);
-
- return Iterables.size(scanner);
- }
-
- public void waitUntilFinished(FluoConfiguration config) {
- try (Environment env = new Environment(config)) {
- log.info("The wait command will exit when all notifications are
processed");
- while (true) {
- long ts1 =
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
- long ntfyCount = countNotifications(env);
- long ts2 =
env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
- if (ntfyCount == 0 && ts1 == (ts2 - 1)) {
- log.info("All processing has finished!");
- break;
- }
-
- try {
- long sleepSec = calculateSleep(ntfyCount,
-
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config));
- log.info("{} notifications are still outstanding. Will try again in
{} seconds...",
- ntfyCount, sleepSec);
- Thread.sleep(1000 * sleepSec);
- } catch (InterruptedException e) {
- log.error("Sleep was interrupted! Exiting...");
- System.exit(-1);
- }
- }
- } catch (FluoException e) {
- log.error(e.getMessage());
- System.exit(-1);
- } catch (Exception e) {
- log.error("An exception was thrown -", e);
- System.exit(-1);
- }
- }
-
- private static class FluoConfigModule extends AbstractModule {
-
- private Class<?> clazz;
- private FluoConfiguration fluoConfig;
-
- FluoConfigModule(Class<?> clazz, FluoConfiguration fluoConfig) {
- this.clazz = clazz;
- this.fluoConfig = fluoConfig;
- }
-
- @Override
- protected void configure() {
- requestStaticInjection(clazz);
- bind(FluoConfiguration.class).toProvider((Provider<FluoConfiguration>)
() -> fluoConfig);
- }
- }
-
- public void exec(FluoConfiguration fluoConfig, String[] args) throws
Exception {
-
- String className = args[0];
- Arrays.copyOfRange(args, 1, args.length);
-
- Class<?> clazz = Class.forName(className);
-
- // inject fluo configuration
- Guice.createInjector(new FluoConfigModule(clazz, fluoConfig));
-
- Method method = clazz.getMethod("main", String[].class);
- method.invoke(null, (Object) Arrays.copyOfRange(args, 1, args.length));
- }
-
- public static class ScanOptions {
-
- @Parameter(names = "-s", description = "Start row (inclusive) of scan")
- private String startRow;
-
- @Parameter(names = "-e", description = "End row (inclusive) of scan")
- private String endRow;
-
- @Parameter(names = "-c", description = "Columns of scan in comma separated
format: "
- +
"<<columnfamily>[:<columnqualifier>]{,<columnfamily>[:<columnqualifier>]}> ")
- private List<String> columns;
-
- @Parameter(names = "-r", description = "Exact row to scan")
- private String exactRow;
-
- @Parameter(names = "-p", description = "Row prefix to scan")
- private String rowPrefix;
-
- @Parameter(names = {"-h", "-help", "--help"}, help = true, description =
"Prints help")
- public boolean help;
-
- @Parameter(names = {"-esc", "--escape-non-ascii"}, help = true,
- description = "Hex encode non ascii bytes", arity = 1)
- public boolean hexEncNonAscii = true;
-
- @Parameter(names = "--raw", help = true,
- description = "Show underlying key/values stored in Accumulo.
Interprets the data using Fluo "
- + "internal schema, making it easier to comprehend.")
- public boolean scanAccumuloTable = false;
-
- public String getStartRow() {
- return startRow;
- }
-
- public String getEndRow() {
- return endRow;
- }
-
- public String getExactRow() {
- return exactRow;
- }
-
- public String getRowPrefix() {
- return rowPrefix;
- }
-
- public List<String> getColumns() {
- if (columns == null) {
- return Collections.emptyList();
- }
- return columns;
- }
-
- public ScanUtil.ScanOpts getScanOpts() {
- EnumSet<ScanFlags> flags = EnumSet.noneOf(ScanFlags.class);
-
- ScanUtil.setFlag(flags, help, ScanFlags.HELP);
- ScanUtil.setFlag(flags, hexEncNonAscii, ScanFlags.HEX);
- ScanUtil.setFlag(flags, scanAccumuloTable, ScanFlags.ACCUMULO);
-
- return new ScanUtil.ScanOpts(startRow, endRow, columns, exactRow,
rowPrefix, flags);
- }
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
deleted file mode 100644
index b54f04b..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/ClusterAppRunner.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runner;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import org.apache.fluo.api.client.FluoAdmin;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.core.client.FluoAdminImpl;
-
-/**
- * For running Fluo app on cluster
- */
-@Deprecated
-public abstract class ClusterAppRunner extends AppRunner {
-
- public ClusterAppRunner(String scriptName) {
- super(scriptName);
- }
-
- public class InitOptions {
-
- @Parameter(names = {"-f", "--force"},
- description = "Skip all prompts and clears Zookeeper and Accumulo
table. Equivalent to "
- + "setting both --clearTable --clearZookeeper")
- private boolean force;
-
- @Parameter(names = {"--clearTable"}, description = "Skips prompt and
clears Accumulo table")
- private boolean clearTable;
-
- @Parameter(names = {"--clearZookeeper"}, description = "Skips prompt and
clears Zookeeper")
- private boolean clearZookeeper;
-
- @Parameter(names = {"-u", "--update"}, description = "Update Fluo
configuration in Zookeeper")
- private boolean update;
-
- @Parameter(names = {"-h", "-help", "--help"}, help = true, description =
"Prints help")
- public boolean help;
-
- public boolean getForce() {
- return force;
- }
-
- public boolean getClearTable() {
- return clearTable;
- }
-
- public boolean getClearZookeeper() {
- return clearZookeeper;
- }
-
- public boolean getUpdate() {
- return update;
- }
- }
-
- public static boolean readYes() {
- String input = "unk";
- while (true) {
- BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(System.in));
- try {
- input = bufferedReader.readLine().trim();
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- if (input.equalsIgnoreCase("y")) {
- return true;
- } else if (input.equalsIgnoreCase("n")) {
- return false;
- } else {
- System.out.print("Unexpected input '" + input + "'. Enter y/n or
ctrl-c to abort: ");
- }
- }
- }
-
- public void init(FluoConfiguration config, String propsPath, String[] args) {
- InitOptions commandOpts = new InitOptions();
- JCommander jcommand = new JCommander(commandOpts);
- jcommand.setProgramName("fluo init");
- try {
- jcommand.parse(args);
- } catch (ParameterException e) {
- System.err.println(e.getMessage());
- jcommand.usage();
- System.exit(-1);
- }
-
- if (commandOpts.help) {
- jcommand.usage();
- System.exit(0);
- }
-
- if (!config.hasRequiredAdminProps()) {
- System.err.println("Error - Required properties are not set in " +
propsPath);
- System.exit(-1);
- }
- try {
- config.validate();
- } catch (IllegalArgumentException e) {
- System.err
- .println("Error - Invalid fluo.properties (" + propsPath + ") due to
" + e.getMessage());
- System.exit(-1);
- } catch (Exception e) {
- System.err
- .println("Error - Invalid fluo.properties (" + propsPath + ") due to
" + e.getMessage());
- e.printStackTrace();
- System.exit(-1);
- }
-
- try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
-
- if (admin.applicationRunning()) {
- System.err.println("Error - The Fluo '" + config.getApplicationName()
+ "' application"
- + " is already running and must be stopped before running 'fluo
init'. "
- + " Aborted initialization.");
- System.exit(-1);
- }
-
- FluoAdmin.InitializationOptions initOpts = new
FluoAdmin.InitializationOptions();
-
- if (commandOpts.getUpdate()) {
- System.out.println("Updating configuration for the Fluo '" +
config.getApplicationName()
- + "' application in Zookeeper using " + propsPath);
- admin.updateSharedConfig();
- System.out.println("Update is complete.");
- System.exit(0);
- }
-
- if (commandOpts.getForce()) {
- initOpts.setClearZookeeper(true).setClearTable(true);
- } else {
- if (commandOpts.getClearZookeeper()) {
- initOpts.setClearZookeeper(true);
- } else if (admin.zookeeperInitialized()) {
- System.out.print("A Fluo '" + config.getApplicationName()
- + "' application is already initialized in Zookeeper at " +
config.getAppZookeepers()
- + " - Would you like to clear and reinitialize Zookeeper"
- + " for this application (y/n)? ");
- if (readYes()) {
- initOpts.setClearZookeeper(true);
- } else {
- System.out.println("Aborted initialization.");
- System.exit(-1);
- }
- }
-
- if (commandOpts.getClearTable()) {
- initOpts.setClearTable(true);
- } else if (admin.accumuloTableExists()) {
- System.out.print("The Accumulo table '" + config.getAccumuloTable()
- + "' already exists - Would you like to drop and recreate this
table (y/n)? ");
- if (readYes()) {
- initOpts.setClearTable(true);
- } else {
- System.out.println("Aborted initialization.");
- System.exit(-1);
- }
- }
- }
-
- System.out.println(
- "Initializing Fluo '" + config.getApplicationName() + "' application
using " + propsPath);
- try {
- admin.initialize(initOpts);
- } catch (Exception e) {
- System.out.println("Initialization failed due to the following
exception:");
- e.printStackTrace();
- System.exit(-1);
- }
- System.out.println("Initialization is complete.");
- }
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
deleted file mode 100644
index be89606..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/runner/YarnAppRunner.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.runner;
-
-import java.io.File;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.fluo.accumulo.util.ZookeeperPath;
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
-import org.apache.fluo.core.client.FluoAdminImpl;
-import org.apache.fluo.core.util.CuratorUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.twill.api.ResourceReport;
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillPreparer;
-import org.apache.twill.api.TwillRunResources;
-import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.internal.RunIds;
-import org.apache.twill.yarn.YarnTwillRunnerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Yarn Implementation of ClusterAppRunner
- */
-@Deprecated
-public class YarnAppRunner extends ClusterAppRunner implements AutoCloseable {
-
- private static final Logger log =
LoggerFactory.getLogger(YarnAppRunner.class);
- private Map<String, TwillRunnerService> twillRunners = new HashMap<>();
- private Map<String, CuratorFramework> curators = new HashMap<>();
- private String hadoopPrefix;
-
- public YarnAppRunner(String hadoopPrefix) {
- super("fluo");
- this.hadoopPrefix = hadoopPrefix;
- }
-
- private synchronized CuratorFramework getAppCurator(FluoConfiguration
config) {
- if (!curators.containsKey(config.getApplicationName())) {
- CuratorFramework curator = CuratorUtil.newAppCurator(config);
- curator.start();
- curators.put(config.getApplicationName(), curator);
- }
- return curators.get(config.getApplicationName());
- }
-
- private synchronized TwillRunnerService getTwillRunner(FluoConfiguration
config) {
- if (!twillRunners.containsKey(config.getApplicationName())) {
- YarnConfiguration yarnConfig = new YarnConfiguration();
- yarnConfig.addResource(new Path(hadoopPrefix +
"/etc/hadoop/core-site.xml"));
- yarnConfig.addResource(new Path(hadoopPrefix +
"/etc/hadoop/yarn-site.xml"));
-
- TwillRunnerService twillRunner =
- new YarnTwillRunnerService(yarnConfig, config.getAppZookeepers() +
ZookeeperPath.TWILL);
- twillRunner.start();
-
- twillRunners.put(config.getApplicationName(), twillRunner);
-
- // sleep to give twill time to retrieve state from zookeeper
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- }
- return twillRunners.get(config.getApplicationName());
- }
-
- private void checkIfInitialized(FluoConfiguration config) {
-
- try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
- if (!admin.zookeeperInitialized()) {
- throw new FluoException("A Fluo '" + config.getApplicationName() + "'
application has not "
- + "been initialized yet in Zookeeper at " +
config.getAppZookeepers());
- }
- }
- }
-
- public void list(FluoConfiguration config) {
- try (CuratorFramework curator = CuratorUtil.newFluoCurator(config)) {
- curator.start();
-
- try {
- if (curator.checkExists().forPath("/") == null) {
- System.out.println("Fluo instance (" +
config.getInstanceZookeepers() + ") has not been "
- + "created yet in Zookeeper. It will be created when the first
Fluo application is "
- + "initialized for this instance.");
- return;
- }
- List<String> children = curator.getChildren().forPath("/");
- if (children.isEmpty()) {
- System.out.println("Fluo instance (" +
config.getInstanceZookeepers() + ") does not "
- + "contain any Fluo applications.");
- return;
- }
- Collections.sort(children);
-
- System.out.println("Fluo instance (" + config.getInstanceZookeepers()
+ ") contains "
- + children.size() + " application(s)\n");
- System.out.println("Application Status YarnAppName
YarnAppId");
- System.out.println("----------- ------ -----------
---------");
- for (String path : children) {
- FluoConfiguration appConfig = new FluoConfiguration(config);
- appConfig.setApplicationName(path);
- String state = "INITIALIZED";
- String yarnId = "not started yet";
- String yarnAppName = getYarnApplicationName(path);
- if (twillIdExists(appConfig)) {
- String twillId = getTwillId(appConfig);
- yarnId = getAppId(appConfig);
- TwillController controller = getTwillRunner(appConfig).lookup(
- getYarnApplicationName(appConfig.getApplicationName()),
RunIds.fromString(twillId));
- if (controller == null) {
- state = "STOPPED";
- } else {
- state = "RUNNING";
- }
- }
- System.out.format("%-15s %-11s %-20s %s\n", path, state,
yarnAppName, yarnId);
- }
- } catch (Exception e) {
- throw new FluoException(e);
- }
- }
- }
-
- public void start(FluoConfiguration config, String appConfDir, String
appLibPath,
- String fluoLibPath) {
-
- checkIfInitialized(config);
-
- if (twillIdExists(config)) {
- String runId = getTwillId(config);
-
- TwillController controller = getTwillRunner(config)
- .lookup(getYarnApplicationName(config.getApplicationName()),
RunIds.fromString(runId));
- if ((controller != null) && isReady(controller)) {
- throw new FluoException(
- "A YARN application " + getAppInfo(config) + " is already running
for the Fluo '"
- + config.getApplicationName() + "' application! Please stop
it using 'fluo stop "
- + config.getApplicationName() + "' before starting a new
one.");
- }
- }
-
- if (!config.hasRequiredOracleProps() || !config.hasRequiredWorkerProps()) {
- throw new FluoException("Failed to start Fluo '" +
config.getApplicationName()
- + "' application because fluo.properties is missing required
properties.");
- }
-
- try {
- config.validate();
- } catch (IllegalArgumentException e) {
- throw new FluoException("Invalid fluo.properties due to " +
e.getMessage());
- } catch (Exception e) {
- throw new FluoException("Invalid fluo.properties due to " +
e.getMessage(), e);
- }
-
- TwillPreparer preparer = getTwillRunner(config)
- .prepare(new org.apache.fluo.cluster.yarn.FluoTwillApp(config,
appConfDir));
-
- // Add jars from fluo lib/ directory that are not being loaded by Twill.
- try {
- File libDir = new File(fluoLibPath);
- File[] libFiles = libDir.listFiles();
- if (libFiles != null) {
- for (File f : libFiles) {
- if (f.isFile()) {
- String jarPath = "file:" + f.getCanonicalPath();
- log.trace("Adding library jar (" + f.getName() + ") to Fluo
application.");
- preparer.withResources(new URI(jarPath));
- }
- }
- }
-
- // Add jars found in application's lib dir
- File appLibDir = new File(appLibPath);
- File[] appFiles = appLibDir.listFiles();
- if (appFiles != null) {
- for (File f : appFiles) {
- String jarPath = "file:" + f.getCanonicalPath();
- log.debug("Adding application jar (" + f.getName() + ") to Fluo
application.");
- preparer.withResources(new URI(jarPath));
- }
- }
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
-
- Objects.requireNonNull(preparer, "Failed to prepare twill application");
-
- log.info("Starting Fluo '{}' application in YARN...",
config.getApplicationName());
- TwillController controller = preparer.start();
-
- try {
- // set twill run id zookeeper
- String twillId = controller.getRunId().toString();
- CuratorUtil.putData(getAppCurator(config), ZookeeperPath.YARN_TWILL_ID,
- twillId.getBytes(StandardCharsets.UTF_8),
CuratorUtil.NodeExistsPolicy.OVERWRITE);
-
- // set app id in zookeeper
- String appId = getResourceReport(controller, -1).getApplicationId();
- Objects.requireNonNull(appId, "Failed to retrieve YARN app ID from
Twill");
- CuratorUtil.putData(getAppCurator(config), ZookeeperPath.YARN_APP_ID,
- appId.getBytes(StandardCharsets.UTF_8),
CuratorUtil.NodeExistsPolicy.OVERWRITE);
-
- log.info("The Fluo '{}' application is running in YARN {}",
config.getApplicationName(),
- getAppInfo(config));
-
- log.info("Waiting for all desired containers to start...");
- int checks = 0;
- while (!allContainersRunning(controller, config)) {
- Thread.sleep(500);
- checks++;
- if (checks == 30) {
- log.warn("Still waiting... YARN may not have enough resources
available for this "
- + "application. Use ctrl-c to stop waiting and check status
using "
- + "'fluo info <app>'.");
- }
- }
- log.info("All desired containers are running in YARN " +
getAppInfo(config));
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- public void stop(FluoConfiguration config) throws InterruptedException,
ExecutionException {
- checkIfInitialized(config);
- String twillId = verifyTwillId(config);
-
- TwillController controller = getTwillRunner(config)
- .lookup(getYarnApplicationName(config.getApplicationName()),
RunIds.fromString(twillId));
- if ((controller != null) && isReady(controller)) {
- System.out.print("Stopping Fluo '" + config.getApplicationName() + "'
application "
- + getAppInfo(config) + "...");
- controller.terminate().get();
- System.out.println("DONE");
- } else {
- System.out.println("Fluo '" + config.getApplicationName() + "'
application "
- + getAppInfo(config) + " is already stopped.");
- }
- }
-
- public void kill(FluoConfiguration config) throws Exception {
- checkIfInitialized(config);
-
- String twillId = verifyTwillId(config);
-
- TwillController controller = getTwillRunner(config)
- .lookup(getYarnApplicationName(config.getApplicationName()),
RunIds.fromString(twillId));
- if (controller != null) {
- System.out.print("Killing Fluo '" + config.getApplicationName() + "'
application "
- + getAppInfo(config) + "...");
- controller.kill();
- System.out.println("DONE");
- } else {
- System.out.println("Fluo '" + config.getApplicationName() + "'
application "
- + getAppInfo(config) + " is already stopped.");
- }
- }
-
- /**
- * Attempts to retrieves ResourceReport until maxWaitMs time is reached. Set
maxWaitMs to -1 to
- * retry forever.
- */
- private ResourceReport getResourceReport(TwillController controller, int
maxWaitMs) {
- ResourceReport report = controller.getResourceReport();
- int elapsed = 0;
- while (report == null) {
- report = controller.getResourceReport();
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- throw new IllegalStateException(e);
- }
- elapsed += 500;
- if ((maxWaitMs != -1) && (elapsed > maxWaitMs)) {
- String msg = String.format("Exceeded max wait time to retrieve
ResourceReport from Twill."
- + " Elapsed time = %s ms", elapsed);
- log.error(msg);
- throw new IllegalStateException(msg);
- }
- if ((elapsed % 10000) == 0) {
- log.info("Waiting for ResourceReport from Twill. Elapsed time = {}
ms", elapsed);
- }
- }
- return report;
- }
-
- private boolean isReady(TwillController controller) {
- try {
- if (getResourceReport(controller, 30000) != null) {
- return true;
- }
- } catch (Exception e) {
- log.error("Exception occurred while getting Twill resource report", e);
- }
- return false;
- }
-
- private boolean allContainersRunning(TwillController controller,
FluoConfiguration config) {
- return org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
- org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME) ==
org.apache.fluo.cluster.util.FluoYarnConfig
- .getOracleInstances(config)
- && org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
- org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME) ==
org.apache.fluo.cluster.util.FluoYarnConfig
- .getWorkerInstances(config);
- }
-
- private String containerStatus(TwillController controller, FluoConfiguration
config) {
- return ""
- + org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
- org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME)
- + " of " +
org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config)
- + " Oracle containers and "
- + org.apache.fluo.cluster.yarn.TwillUtil.numRunning(controller,
- org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME)
- + " of " +
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config)
- + " Worker containers";
- }
-
- public void status(FluoConfiguration config, boolean extraInfo) {
- checkIfInitialized(config);
- if (!twillIdExists(config)) {
- System.out.println("Fluo '" + config.getApplicationName()
- + "' application was initialized but has not been started.");
- return;
- }
- String twillId = getTwillId(config);
- TwillController controller = getTwillRunner(config)
- .lookup(getYarnApplicationName(config.getApplicationName()),
RunIds.fromString(twillId));
- if (controller == null) {
- System.out.print("Fluo '" + config.getApplicationName() + "' application
"
- + getAppInfo(config) + " has stopped.");
- } else {
- System.out.println("A Fluo '" + config.getApplicationName() + "'
application is running"
- + " in YARN " + getFullInfo(config));
-
- if (!allContainersRunning(controller, config)) {
- System.out.println("\nWARNING - The Fluo application is not running
all desired "
- + "containers! YARN may not have enough available resources.
Application is "
- + "currently running " + containerStatus(controller, config));
- }
-
- if (extraInfo) {
- ResourceReport report = getResourceReport(controller, 30000);
- Collection<TwillRunResources> resources;
- resources = report
-
.getRunnableResources(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME);
- System.out.println("\nThe application has " + resources.size() + " of "
- +
org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config)
- + " desired Oracle containers:\n");
- org.apache.fluo.cluster.yarn.TwillUtil.printResources(resources);
-
- resources = report
-
.getRunnableResources(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME);
- System.out.println("\nThe application has " + resources.size() + " of "
- +
org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config)
- + " desired Worker containers:\n");
- org.apache.fluo.cluster.yarn.TwillUtil.printResources(resources);
- }
- }
- }
-
- private String verifyTwillId(FluoConfiguration config) {
- if (!twillIdExists(config)) {
- throw new FluoException("A YARN application is not referenced in
Zookeeper for this "
- + " Fluo application. Check if there is a Fluo application running
in YARN using the "
- + "command 'yarn application -list`. If so, verify that your
fluo.properties is "
- + "configured correctly.");
- }
- return getTwillId(config);
- }
-
- private String getAppInfo(FluoConfiguration config) {
- return "(yarn id = " + getAppId(config) + ")";
- }
-
- private String getFullInfo(FluoConfiguration config) {
- return "(yarn id = " + getAppId(config) + ", twill id = " +
getTwillId(config) + ")";
- }
-
- private boolean twillIdExists(FluoConfiguration config) {
- try {
- return
getAppCurator(config).checkExists().forPath(ZookeeperPath.YARN_TWILL_ID) !=
null;
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- private String getTwillId(FluoConfiguration config) {
- try {
- return new
String(getAppCurator(config).getData().forPath(ZookeeperPath.YARN_TWILL_ID),
- StandardCharsets.UTF_8);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- private String getAppId(FluoConfiguration config) {
- try {
- return new
String(getAppCurator(config).getData().forPath(ZookeeperPath.YARN_APP_ID),
- StandardCharsets.UTF_8);
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- public static String getYarnApplicationName(String appName) {
- return String.format("fluo-app-%s", appName);
- }
-
- @Override
- public void close() {
- for (TwillRunnerService twillRunner : twillRunners.values()) {
- twillRunner.stop();
- }
- for (CuratorFramework curator : curators.values()) {
- curator.close();
- }
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
deleted file mode 100644
index ae67555..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ClusterUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.util;
-
-import java.io.File;
-
-@Deprecated
-public class ClusterUtil {
-
- private ClusterUtil() {}
-
- public static void verifyConfigFilesExist(String configDir, String...
fileNames) {
- for (String fn : fileNames) {
- File f = new File(configDir + "/" + fn);
- if (!f.isFile()) {
- System.out.println("ERROR - This command requires the file 'conf/" + fn
- + "' to be present. It can be created by copying its example from
'conf/examples'.");
- System.exit(-1);
- }
- }
- }
-
- public static void verifyConfigPathsExist(String... paths) {
- for (String path : paths) {
- File f = new File(path);
- if (!f.isFile()) {
- System.out.println("ERROR - This command requires the file '" + path
- + "' to be present. It can be created by copying its example from
'conf/examples'.");
- System.exit(-1);
- }
- }
- }
-
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
deleted file mode 100644
index 70ce7d0..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoInstall.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.util;
-
-import java.io.File;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.fluo.api.exceptions.FluoException;
-
-@Deprecated
-public class FluoInstall {
-
- private String fluoHomeDir;
-
- public FluoInstall(String fluoHomeDir) {
- this.fluoHomeDir = fluoHomeDir;
- }
-
- public String getFluoConfDir() {
- return fluoHomeDir + "/conf";
- }
-
- public String getFluoPropsPath() {
- return getFluoConfDir() + "/fluo.properties";
- }
-
- public String getLibDir() {
- return fluoHomeDir + "/lib";
- }
-
- public String getAppsDir() {
- return fluoHomeDir + "/apps";
- }
-
- public String getAppConfDir(String appName) {
- return String.format("%s/%s/conf", getAppsDir(), appName);
- }
-
- public String getAppLibDir(String appName) {
- return String.format("%s/%s/lib", getAppsDir(), appName);
- }
-
- public String getAppPropsPath(String appName) {
- return getAppConfDir(appName) + "/fluo.properties";
- }
-
- public void verifyFluoInstall() {
- verifyInstallPath(fluoHomeDir);
- verifyInstallPath(getLibDir());
- verifyInstallPath(getFluoConfDir());
- verifyInstallPath(getFluoPropsPath());
- }
-
- public void verifyAppInstall(String appName) {
- verifyFluoInstall();
- verifyAppPath(appName, getAppsDir());
- verifyAppPath(appName, getAppConfDir(appName));
- verifyAppPath(appName, getAppPropsPath(appName));
- }
-
- public FluoConfiguration getAppConfiguration(String appName) {
- return getAppConfiguration(appName, true);
- }
-
- public FluoConfiguration getAppConfiguration(String appName, boolean debug) {
- verifyAppInstall(appName);
- String propsPath = getAppPropsPath(appName);
- FluoConfiguration config = new FluoConfiguration(new File(propsPath));
- if (!config.getApplicationName().equals(appName)) {
- throw new FluoException("Application name in config '" +
config.getApplicationName()
- + "' does not match given appName: " + appName);
- }
- if (debug) {
- System.out.println("Connecting to Fluo instance (" +
config.getInstanceZookeepers()
- + ") using config (" + stripFluoHomeDir(propsPath) + ")");
- }
- return config;
- }
-
- public String stripFluoHomeDir(String path) {
- return path.substring(fluoHomeDir.length() + 1);
- }
-
- public FluoConfiguration getFluoConfiguration() {
- return getFluoConfiguration(true);
- }
-
- public FluoConfiguration getFluoConfiguration(boolean debug) {
- verifyFluoInstall();
- String propsPath = getFluoPropsPath();
- FluoConfiguration config = new FluoConfiguration(new File(propsPath));
- if (debug) {
- System.out.println("Connecting to Fluo instance (" +
config.getInstanceZookeepers()
- + ") using config (" + stripFluoHomeDir(propsPath) + ")");
- }
- return config;
- }
-
- public FluoConfiguration resolveFluoConfiguration(String appName) {
- return resolveFluoConfiguration(appName, true);
- }
-
- public FluoConfiguration resolveFluoConfiguration(String appName, boolean
debug) {
- FluoConfiguration config;
- try {
- config = getAppConfiguration(appName, debug);
- } catch (FluoException e) {
- config = new FluoConfiguration(getFluoConfiguration(debug));
- config.setApplicationName(appName);
- }
- return config;
- }
-
- public String resolveFluoPropsPath(String appName) {
- String propsPath;
- try {
- getAppConfiguration(appName, false);
- propsPath = getAppPropsPath(appName);
- } catch (FluoException e) {
- getFluoConfiguration(false);
- propsPath = getFluoPropsPath();
- }
- return propsPath;
- }
-
- private void verifyInstallPath(String path) {
- if (!(new File(path).exists())) {
- throw new FluoException("Path does not exist in Fluo install: " + path);
- }
- }
-
- private void verifyAppPath(String appName, String path) {
- if (!(new File(path).exists())) {
- throw new FluoException(
- "Path does not exist for Fluo '" + appName + "' application: " +
path);
- }
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
deleted file mode 100644
index acc456c..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/FluoYarnConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.util;
-
-import com.google.common.base.Preconditions;
-import org.apache.fluo.api.config.FluoConfiguration;
-
-@Deprecated
-public class FluoYarnConfig {
-
- private static final String YARN_PREFIX = FluoConfiguration.FLUO_PREFIX +
".yarn";
- public static final String WORKER_INSTANCES_PROP = YARN_PREFIX +
".worker.instances";
- public static final String WORKER_MAX_MEMORY_MB_PROP = YARN_PREFIX +
".worker.max.memory.mb";
- public static final String WORKER_NUM_CORES_PROP = YARN_PREFIX +
".worker.num.cores";
- public static final int WORKER_INSTANCES_DEFAULT = 1;
- public static final int WORKER_MAX_MEMORY_MB_DEFAULT = 1024;
- public static final int WORKER_NUM_CORES_DEFAULT = 1;
-
- public static final String ORACLE_INSTANCES_PROP = YARN_PREFIX +
".oracle.instances";
- public static final String ORACLE_MAX_MEMORY_MB_PROP = YARN_PREFIX +
".oracle.max.memory.mb";
- public static final String ORACLE_NUM_CORES_PROP = YARN_PREFIX +
".oracle.num.cores";
- public static final int ORACLE_INSTANCES_DEFAULT = 1;
- public static final int ORACLE_MAX_MEMORY_MB_DEFAULT = 512;
- public static final int ORACLE_NUM_CORES_DEFAULT = 1;
-
- public static int getWorkerInstances(FluoConfiguration config) {
- return getPositiveInt(config, WORKER_INSTANCES_PROP,
WORKER_INSTANCES_DEFAULT);
- }
-
- public static int getWorkerMaxMemory(FluoConfiguration config) {
- return getPositiveInt(config, WORKER_MAX_MEMORY_MB_PROP,
WORKER_MAX_MEMORY_MB_DEFAULT);
- }
-
- public static int getWorkerNumCores(FluoConfiguration config) {
- return getPositiveInt(config, WORKER_NUM_CORES_PROP,
WORKER_NUM_CORES_DEFAULT);
- }
-
- public static int getOracleMaxMemory(FluoConfiguration config) {
- return getPositiveInt(config, ORACLE_MAX_MEMORY_MB_PROP,
ORACLE_MAX_MEMORY_MB_DEFAULT);
- }
-
- public static int getOracleInstances(FluoConfiguration config) {
- return getPositiveInt(config, ORACLE_INSTANCES_PROP,
ORACLE_INSTANCES_DEFAULT);
- }
-
- public static int getOracleNumCores(FluoConfiguration config) {
- return getPositiveInt(config, ORACLE_NUM_CORES_PROP,
ORACLE_NUM_CORES_DEFAULT);
- }
-
- private static int getPositiveInt(FluoConfiguration config, String property,
int defaultValue) {
- int value = config.getInt(property, defaultValue);
- Preconditions.checkArgument(value > 0, property + " must be positive");
- return value;
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
deleted file mode 100644
index 5394edb..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/LogbackUtil.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.util;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import ch.qos.logback.core.joran.spi.JoranException;
-import ch.qos.logback.core.util.StatusPrinter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.fluo.api.config.FluoConfiguration.FLUO_PREFIX;
-
-/**
- * Used to initialize Logging for cluster applications
- */
-@Deprecated
-public class LogbackUtil {
-
- private static final Logger log = LoggerFactory.getLogger(LogbackUtil.class);
-
- private static final String FLUO_LOG_APP = FLUO_PREFIX + ".log.app";
- private static final String FLUO_LOG_DIR = FLUO_PREFIX + ".log.dir";
- private static final String FLUO_LOG_HOST = FLUO_PREFIX + ".log.host";
-
- public static void init(String application, String configDir, String logDir)
throws IOException {
-
- String logConfig = String.format("%s/logback.xml", configDir);
- ClusterUtil.verifyConfigPathsExist(logConfig);
-
- System.setProperty(FLUO_LOG_APP, application);
- System.setProperty(FLUO_LOG_DIR, logDir);
-
- String localHostname = InetAddress.getLocalHost().getHostName();
- String instanceId = System.getenv("TWILL_INSTANCE_ID");
- String logHost = localHostname;
- if (instanceId != null) {
- logHost = String.format("%s_%s", instanceId, localHostname);
- }
- System.setProperty(FLUO_LOG_HOST, logHost);
-
- // assume SLF4J is bound to logback in the current environment
- LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
-
- try {
- JoranConfigurator configurator = new JoranConfigurator();
- configurator.setContext(context);
- // Call context.reset() to clear any previous configuration, e.g. default
- // configuration. For multi-step configuration, omit calling
context.reset().
- context.reset();
- configurator.doConfigure(logConfig);
- } catch (JoranException je) {
- // StatusPrinter will handle this
- }
- StatusPrinter.printInCaseOfErrorsOrWarnings(context);
-
- System.out.println("Logging to " + logDir + " using config " + logConfig);
- log.info("Initialized logging using config in " + logConfig);
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
deleted file mode 100644
index e765819..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/util/ValidateAppName.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.util;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-
-@Deprecated
-public class ValidateAppName {
-
- public static void main(String[] args) {
- if (args.length != 1) {
- System.out.println("ERROR - Expected usage: ValidateAppName
<fluoApplicationName>");
- System.exit(-1);
- }
-
- FluoConfiguration config = new FluoConfiguration();
- try {
- config.setApplicationName(args[0]);
- } catch (IllegalArgumentException e) {
- System.out.println("ERROR - " + e.getMessage());
- System.exit(-1);
- }
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
deleted file mode 100644
index eda580a..0000000
---
a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/FluoTwillApp.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.yarn;
-
-import java.io.File;
-
-import org.apache.fluo.api.config.FluoConfiguration;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.ResourceSpecification.SizeUnit;
-import org.apache.twill.api.TwillApplication;
-import org.apache.twill.api.TwillSpecification;
-import org.apache.twill.api.TwillSpecification.Builder.LocalFileAdder;
-import org.apache.twill.api.TwillSpecification.Builder.MoreFile;
-import org.apache.twill.api.TwillSpecification.Builder.MoreRunnable;
-import org.apache.twill.api.TwillSpecification.Builder.RunnableSetter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents Fluo application in Twill
- */
-@Deprecated
-public class FluoTwillApp implements TwillApplication {
-
- private static final Logger log =
LoggerFactory.getLogger(FluoTwillApp.class);
-
- private final FluoConfiguration config;
- private final String fluoConf;
-
- public FluoTwillApp(FluoConfiguration config, String fluoConf) {
- this.config = config;
- this.fluoConf = fluoConf;
- }
-
- private MoreFile addConfigFiles(LocalFileAdder fileAdder) {
- File confDir = new File(fluoConf);
- MoreFile moreFile = null;
- File[] confFiles = confDir.listFiles();
- if (confFiles != null) {
- for (File f : confFiles) {
- if (f.isFile()) {
- log.trace("Adding config file - " + f.getAbsolutePath());
- if (moreFile == null) {
- moreFile = fileAdder.add(String.format("./conf/%s", f.getName()),
f);
- } else {
- moreFile = moreFile.add(String.format("./conf/%s", f.getName()),
f);
- }
- }
- }
- }
- return moreFile;
- }
-
- @Override
- public TwillSpecification configure() {
-
- final int oracleInstances =
- org.apache.fluo.cluster.util.FluoYarnConfig.getOracleInstances(config);
- final int oracleMaxMemory =
- org.apache.fluo.cluster.util.FluoYarnConfig.getOracleMaxMemory(config);
- final int oracleNumCores =
- org.apache.fluo.cluster.util.FluoYarnConfig.getOracleNumCores(config);
- final int workerInstances =
- org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerInstances(config);
- final int workerMaxMemory =
- org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerMaxMemory(config);
- final int workerNumCores =
- org.apache.fluo.cluster.util.FluoYarnConfig.getWorkerNumCores(config);
-
- log.info(
- "Configuring Fluo '{}' application with {} Oracle instances and {}
Worker instances "
- + "with following properties:",
- config.getApplicationName(), oracleInstances, workerInstances);
-
- log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.ORACLE_MAX_MEMORY_MB_PROP,
- oracleMaxMemory);
- log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.WORKER_MAX_MEMORY_MB_PROP,
- workerMaxMemory);
- log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.ORACLE_NUM_CORES_PROP,
- oracleNumCores);
- log.info("{} = {}",
org.apache.fluo.cluster.util.FluoYarnConfig.WORKER_NUM_CORES_PROP,
- workerNumCores);
-
- // Start building Fluo Twill application
- MoreRunnable moreRunnable =
-
TwillSpecification.Builder.with().setName(org.apache.fluo.cluster.runner.YarnAppRunner
-
.getYarnApplicationName(config.getApplicationName())).withRunnable();
-
- // Configure Oracle(s)
- ResourceSpecification oracleResources =
- ResourceSpecification.Builder.with().setVirtualCores(oracleNumCores)
- .setMemory(oracleMaxMemory,
SizeUnit.MEGA).setInstances(oracleInstances).build();
-
- LocalFileAdder fileAdder = moreRunnable
- .add(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME,
- new org.apache.fluo.cluster.runnable.OracleRunnable(),
oracleResources)
- .withLocalFiles();
- RunnableSetter runnableSetter = addConfigFiles(fileAdder).apply();
-
- // Configure Worker(s)
- ResourceSpecification workerResources =
- ResourceSpecification.Builder.with().setVirtualCores(workerNumCores)
- .setMemory(workerMaxMemory,
SizeUnit.MEGA).setInstances(workerInstances).build();
-
- fileAdder = runnableSetter
- .add(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME,
- new org.apache.fluo.cluster.runnable.WorkerRunnable(),
workerResources)
- .withLocalFiles();
- runnableSetter = addConfigFiles(fileAdder).apply();
-
- // Set runnable order, build and return TwillSpecification
- return runnableSetter.withOrder()
- .begin(org.apache.fluo.cluster.runnable.OracleRunnable.ORACLE_NAME)
-
.nextWhenStarted(org.apache.fluo.cluster.runnable.WorkerRunnable.WORKER_NAME).build();
- }
-}
diff --git
a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
b/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
deleted file mode 100644
index 58435a0..0000000
--- a/modules/cluster/src/main/java/org/apache/fluo/cluster/yarn/TwillUtil.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-
-package org.apache.fluo.cluster.yarn;
-
-import java.util.Collection;
-
-import org.apache.twill.api.TwillController;
-import org.apache.twill.api.TwillRunResources;
-
-/**
- * Twill Utility classes
- */
-@Deprecated
-public class TwillUtil {
-
- private TwillUtil() {}
-
- public static int numRunning(TwillController controller, String
runnableName) {
- return
controller.getResourceReport().getRunnableResources(runnableName).size();
- }
-
- public static void printResources(Collection<TwillRunResources>
resourcesList) {
- System.out.println("Instance Cores MaxMemory Container ID
Host");
- System.out.println("-------- ----- --------- ------------
----");
- for (TwillRunResources resources : resourcesList) {
- System.out.format("%-9s %-6s %4s MB %-40s %s\n",
resources.getInstanceId(),
- resources.getVirtualCores(), resources.getMemoryMB(),
resources.getContainerId(),
- resources.getHost());
- }
- }
-}
diff --git a/modules/cluster/src/main/resources/log4j.xml
b/modules/cluster/src/main/resources/log4j.xml
deleted file mode 100644
index 40476d4..0000000
--- a/modules/cluster/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,38 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- agreements. See the NOTICE file distributed with this work for additional
information regarding
- copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- "License"); you may not use this file except in compliance with the License.
You may obtain a
- copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
distributed under the License
- is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- or implied. See the License for the specific language governing permissions
and limitations under
- the License.
--->
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
-
- <appender name="console" class="org.apache.log4j.ConsoleAppender">
- <param name="Target" value="System.out"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{ISO8601} [%-8c{2}] %-5p: %m%n"
/>
- </layout>
- </appender>
-
- <logger name="org.apache.zookeeper">
- <level value="ERROR" />
- </logger>
-
- <logger name="org.apache.curator">
- <level value="ERROR" />
- </logger>
-
- <root>
- <level value="INFO" />
- <appender-ref ref="console" />
- </root>
-</log4j:configuration>
diff --git a/modules/cluster/src/main/resources/logback.xml
b/modules/cluster/src/main/resources/logback.xml
deleted file mode 100644
index 0d4d561..0000000
--- a/modules/cluster/src/main/resources/logback.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- agreements. See the NOTICE file distributed with this work for additional
information regarding
- copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- "License"); you may not use this file except in compliance with the License.
You may obtain a
- copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
distributed under the License
- is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- or implied. See the License for the specific language governing permissions
and limitations under
- the License.
--->
-<configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <!-- encoders are assigned the type
- ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.fluo" level="info"/>
- <logger name="org.apache.zookeeper" level="error"/>
- <logger name="org.apache.curator" level="error"/>
- <logger name="org.apache.hadoop.util.NativeCodeLoader" level="error"/>
- <logger name="org.apache.hadoop.yarn.client.RMProxy" level="warn"/>
- <logger name="org.apache.twill" level="warn"/>
- <logger name="org.apache.twill.yarn.YarnTwillRunnerService" level="error"/>
-
- <root level="info">
- <appender-ref ref="STDOUT" />
- </root>
-</configuration>
diff --git a/modules/command/pom.xml b/modules/command/pom.xml
index 8e382ad..5ad0741 100644
--- a/modules/command/pom.xml
+++ b/modules/command/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-command</artifactId>
diff --git
a/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
index c73d8bc..9eeb5b9 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoInit.java
@@ -19,6 +19,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Optional;
import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions;
@@ -87,7 +88,7 @@ public class FluoInit {
while (true) {
BufferedReader bufferedReader = new BufferedReader(new
InputStreamReader(System.in));
try {
- input = bufferedReader.readLine().trim();
+ input =
Optional.ofNullable(bufferedReader.readLine()).orElse("").trim();
} catch (IOException e) {
throw new IllegalStateException(e);
}
diff --git a/modules/core/src/main/findbugs/exclude-filter.xml
b/modules/command/src/main/spotbugs/exclude-filter.xml
similarity index 77%
copy from modules/core/src/main/findbugs/exclude-filter.xml
copy to modules/command/src/main/spotbugs/exclude-filter.xml
index 0c14e36..9cfeb5c 100644
--- a/modules/core/src/main/findbugs/exclude-filter.xml
+++ b/modules/command/src/main/spotbugs/exclude-filter.xml
@@ -15,7 +15,9 @@
limitations under the License.
-->
<FindBugsFilter>
- <Match>
- <Class name="~org\.apache\.fluo\.core\.thrift\..*" />
- </Match>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation
bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
</FindBugsFilter>
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index e1201d3..8f38f5a 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-core</artifactId>
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 8ff40bd..f43e83a 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -20,9 +20,8 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -173,13 +172,9 @@ public class FluoAdminImpl implements FluoAdmin {
accumuloJars = "";
}
- String accumuloClasspath;
+ String accumuloClasspath = "";
if (!accumuloJars.isEmpty()) {
accumuloClasspath = copyJarsToDfs(accumuloJars, "lib/accumulo");
- } else {
- @SuppressWarnings("deprecation")
- String tmpCP = config.getAccumuloClasspath().trim();
- accumuloClasspath = tmpCP;
}
Map<String, String> ntcProps = new HashMap<>();
@@ -481,23 +476,30 @@ public class FluoAdminImpl implements FluoAdmin {
}
private String getJarsFromClasspath() {
- StringBuilder jars = new StringBuilder();
- ClassLoader cl = FluoAdminImpl.class.getClassLoader();
- URL[] urls = ((URLClassLoader) cl).getURLs();
+
+ String sep = System.getProperty("path.separator");
+ String[] paths = System.getProperty("java.class.path").split("[" + sep +
"]");
String regex =
config.getString(FluoConfigurationImpl.ACCUMULO_JARS_REGEX_PROP,
FluoConfigurationImpl.ACCUMULO_JARS_REGEX_DEFAULT);
Pattern pattern = Pattern.compile(regex);
- for (URL url : urls) {
- String jarName = new File(url.getFile()).getName();
- if (pattern.matcher(jarName).matches()) {
- if (jars.length() != 0) {
- jars.append(",");
+ StringBuilder jars = new StringBuilder();
+ for (String path : paths) {
+ java.nio.file.Path name = Paths.get(path).getFileName();
+ if (name != null) {
+ String jarName = name.toString();
+ if (pattern.matcher(jarName).matches()) {
+ if (jars.length() != 0) {
+ jars.append(",");
+ }
+ jars.append(path);
}
- jars.append(url.getFile());
}
}
+
+ logger.debug("Found Fluo Accumulo jars {} ", jars);
+
return jars.toString();
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
index dba18ae..105a697 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/impl/FluoConfigurationImpl.java
@@ -47,7 +47,7 @@ public class FluoConfigurationImpl {
// If period is too short, Zookeeper may be overloaded. If too long, garbage
collection
// may keep older versions of table data unnecessarily.
public static final String ZK_UPDATE_PERIOD_PROP = FLUO_IMPL_PREFIX +
".timestamp.update.period";
- public static long ZK_UPDATE_PERIOD_MS_DEFAULT = 60000;
+ public static final long ZK_UPDATE_PERIOD_MS_DEFAULT = 60000;
// CW is short for ConditionalWriter
public static final String CW_MIN_THREADS_PROP = FLUO_IMPL_PREFIX +
".cw.threads.min";
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
index 205634b..cda6e0b 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java
@@ -101,7 +101,7 @@ public class OracleClient implements AutoCloseable {
curatorFramework.start();
while (!cnxnListener.isConnected()) {
- Thread.sleep(200);
+ UtilWaitThread.sleep(200);
}
leaderLatch = new LeaderLatch(curatorFramework,
ZookeeperPath.ORACLE_SERVER);
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
index 7bb6b10..b399159 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/oracle/OracleServer.java
@@ -49,6 +49,7 @@ import org.apache.fluo.core.util.FluoThreadFactory;
import org.apache.fluo.core.util.Halt;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
+import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -316,7 +317,7 @@ public class OracleServer implements OracleService.Iface,
PathChildrenCacheListe
curatorFramework.start();
while (!cnxnListener.isConnected()) {
- Thread.sleep(200);
+ UtilWaitThread.sleep(200);
}
final InetSocketAddress addr = startServer();
@@ -348,7 +349,7 @@ public class OracleServer implements OracleService.Iface,
PathChildrenCacheListe
pathChildrenCache.start();
while (!cnxnListener.isConnected()) {
- Thread.sleep(200);
+ UtilWaitThread.sleep(200);
}
log.info("Listening " + addr);
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
b/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
index 3ae06af..bce6ffb 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/thrift/OracleService.java
@@ -7,8 +7,6 @@
package org.apache.fluo.core.thrift;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
[email protected](value = "Autogenerated by Thrift Compiler
(0.11.0)",
- date = "2018-10-30")
public class OracleService {
public interface Iface {
diff --git a/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
b/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
index 7df4dd7..6281964 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/thrift/Stamps.java
@@ -7,8 +7,6 @@
package org.apache.fluo.core.thrift;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
[email protected](value = "Autogenerated by Thrift Compiler
(0.11.0)",
- date = "2018-10-30")
public class Stamps implements org.apache.thrift.TBase<Stamps, Stamps._Fields>,
java.io.Serializable, Cloneable, Comparable<Stamps> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC =
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/HostUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/HostUtil.java
index 3d64a43..597331c 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/HostUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/HostUtil.java
@@ -25,17 +25,16 @@ public class HostUtil {
public static String getHostName() throws IOException {
Process p = Runtime.getRuntime().exec("hostname");
- BufferedReader reader = new BufferedReader(new
InputStreamReader(p.getInputStream()));
- String result = reader.readLine();
- try {
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(p.getInputStream()))) {
+ String result = reader.readLine();
int exitCode = p.waitFor();
if (exitCode != 0) {
throw new IOException("Non-zero return from hostname process: " +
result);
}
+ return result;
} catch (InterruptedException e) {
throw new IOException(e);
}
- return result;
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
index 318c29d..ea452a0 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java
@@ -205,6 +205,19 @@ public class NotificationProcessor implements
AutoCloseable {
}
@Override
+ public boolean equals(Object o) {
+ if (o instanceof FutureNotificationTask) {
+ return compareTo((FutureNotificationTask) o) == 0;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.hashCode(notification.getTimestamp());
+ }
+
+ @Override
protected void setException(Throwable t) {
super.setException(t);
System.err.println("Uncaught Exception ");
diff --git a/modules/core/src/main/findbugs/exclude-filter.xml
b/modules/core/src/main/spotbugs/exclude-filter.xml
similarity index 72%
copy from modules/core/src/main/findbugs/exclude-filter.xml
copy to modules/core/src/main/spotbugs/exclude-filter.xml
index 0c14e36..5a558f0 100644
--- a/modules/core/src/main/findbugs/exclude-filter.xml
+++ b/modules/core/src/main/spotbugs/exclude-filter.xml
@@ -15,7 +15,12 @@
limitations under the License.
-->
<FindBugsFilter>
- <Match>
- <Class name="~org\.apache\.fluo\.core\.thrift\..*" />
- </Match>
+ <Match>
+ <Class name="~org\.apache\.fluo\.core\.thrift\..*" />
+ </Match>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation
bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
</FindBugsFilter>
diff --git a/modules/distribution/pom.xml b/modules/distribution/pom.xml
index 4820fc8..b17afb0 100644
--- a/modules/distribution/pom.xml
+++ b/modules/distribution/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo</artifactId>
@@ -40,11 +40,6 @@
</dependency>
<dependency>
<groupId>org.apache.fluo</groupId>
- <artifactId>fluo-cluster</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
<artifactId>fluo-command</artifactId>
<optional>true</optional>
</dependency>
diff --git a/modules/distribution/src/main/assembly/bin.xml
b/modules/distribution/src/main/assembly/bin.xml
index aa3f16a..9f04d13 100644
--- a/modules/distribution/src/main/assembly/bin.xml
+++ b/modules/distribution/src/main/assembly/bin.xml
@@ -55,7 +55,6 @@
<directory>src/main/config</directory>
<outputDirectory>conf</outputDirectory>
<excludes>
- <exclude>src/main/config/fluo.properties.deprecated</exclude>
<exclude>src/main/config/fluo-app.properties</exclude>
</excludes>
</fileSet>
@@ -85,11 +84,6 @@
</fileSets>
<files>
<file>
- <source>src/main/config/fluo.properties.deprecated</source>
- <outputDirectory>conf</outputDirectory>
- <filtered>true</filtered>
- </file>
- <file>
<source>src/main/config/fluo-app.properties</source>
<outputDirectory>conf</outputDirectory>
<filtered>true</filtered>
diff --git a/modules/distribution/src/main/config/fluo-env.sh
b/modules/distribution/src/main/config/fluo-env.sh
index ea329cf..116f6d2 100755
--- a/modules/distribution/src/main/config/fluo-env.sh
+++ b/modules/distribution/src/main/config/fluo-env.sh
@@ -77,12 +77,7 @@ setupClasspathFromSystem()
test -z "$ZOOKEEPER_HOME" && ZOOKEEPER_HOME=/path/to/zookeeper
CLASSPATH="$lib/*"
- # If fluo-conn.properties exists, then classpath does not need to include
twill or logback
- if [ -f "$FLUO_CONN_PROPS" ]; then
- CLASSPATH="$CLASSPATH:$lib/log4j/*"
- else
- CLASSPATH="$CLASSPATH:$lib/twill/*:$lib/logback/*"
- fi
+ CLASSPATH="$CLASSPATH:$lib/log4j/*"
#any jars matching this pattern is excluded from classpath
EXCLUDE_RE="(.*log4j.*)|(.*asm.*)|(.*guava.*)|(.*gson.*)|(.*hadoop-client-minicluster.*)"
@@ -100,11 +95,7 @@ setupClasspathFromSystem()
# `./lib/fetch.sh ahz` to download dependencies to this directory.
setupClasspathFromLib(){
CLASSPATH="$lib/*"
- if [ -f "$FLUO_CONN_PROPS" ]; then
- CLASSPATH="$CLASSPATH:$lib/log4j/*"
- else
- CLASSPATH="$CLASSPATH:$lib/twill/*:$lib/logback/*"
- fi
+ CLASSPATH="$CLASSPATH:$lib/log4j/*"
CLASSPATH="$CLASSPATH:$lib/ahz/*"
export CLASSPATH
}
diff --git a/modules/distribution/src/main/config/fluo.properties.deprecated
b/modules/distribution/src/main/config/fluo.properties.deprecated
deleted file mode 100644
index 8f675f0..0000000
--- a/modules/distribution/src/main/config/fluo.properties.deprecated
+++ /dev/null
@@ -1,169 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
-# agreements. See the NOTICE file distributed with this work for additional
information regarding
-# copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0
-# (the "License"); you may not use this file except in compliance with the
License. You may
-# obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
distributed under the License
-# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
-# or implied. See the License for the specific language governing permissions
and limitations under
-# the License.
-
-#################
-# Fluo properties
-#################
-
-# NOTE - This file has been deprecated and replaced by fluo-conn.properties &
fluo-app.properties!
-# If you would like to use this file instead, rename it to fluo.properties and
remove fluo-conn.properties.
-#
-# All properties that have a default are set with it. Uncomment
-# a property if you want to use a value different than the default.
-# Properties that have no default are uncommented and must be set by
-# the user. Most are unset except for fluo.accumulo.classpath which
-# has a suggested value.
-
-# Client properties
-# -----------------
-# Fluo application name
-fluo.client.application.name=
-
-# Zookeeper connection string specifying host and chroot where Fluo stores
data.
-# A chroot directory suffix must be specified but doesn't need to be named
-# '/fluo'. If not specified, a Fluo application cannot be initialized.
-# Interpolation (i.e ${fluo.client.accumulo.zookeepers}/fluo) can be used
-# when setting this to reuse Accumulo's zookeeper connection string.
-#fluo.client.zookeeper.connect=localhost/fluo
-
-# Zookeeper timeout
-#fluo.client.zookeeper.timeout=30000
-
-# Accumulo instance to connect to
-fluo.client.accumulo.instance=
-
-# Accumulo user
-fluo.client.accumulo.user=
-
-# Accumulo password
-fluo.client.accumulo.password=
-
-# Accumulo zookeepers
-#fluo.client.accumulo.zookeepers=localhost
-
-# Client retry timeout (in milliseconds). Set to -1 to retry forever.
-#fluo.client.retry.timeout.ms=-1
-
-# Admin properties
-# ----------------
-# Accumulo table to initialize
-fluo.admin.accumulo.table=${fluo.client.application.name}
-
-# HDFS root path. Should match 'fs.defaultFS' property in Hadoop's
core-site.xml
-fluo.admin.hdfs.root=hdfs://localhost:10000
-
-# Fluo uses iterators within Accumulo tablet servers, therefore Accumulo per
-# table classpath need to be configured with a comma seperated list of uris
-# where Accumulo can find Fluo jars. These jars should be reachable from
-# every tablet server. For the default jars below, `fluo init` will place them
-# in HDFS. If you add more jars to the classpath below, you will need to
-# add them to HDFS.
-fluo.admin.accumulo.classpath=${fluo.admin.hdfs.root}/fluo/lib/fluo-api-${project.version}.jar,${fluo.admin.hdfs.root}/fluo/lib/fluo-accumulo-${project.version}.jar
-
-# Observer properties
-# -------------------
-# Specifies an observer provider. This should be the name of a class that
-# implements org.apache.fluo.api.observer.ObserverProvider.
-#fluo.observer.provider=com.foo.AppObserverProvider
-
-# Transaction properties
-# ----------------------
-# Amount of time (in milliseconds) clients wait before rolling back transaction
-#fluo.tx.rollback.time=300000
-
-# Worker properties
-# -----------------
-# Number of threads in each worker instance
-#fluo.worker.num.threads=10
-
-# Loader properties
-# -----------------
-# Number of threads each loader runs. Can set to zero for no threads, thread
-# adding Loader will execute. Must also set fluo.loader.queue.size to zero
-# when setting this to zero.
-#fluo.loader.num.threads=10
-
-# Queue size of loader
-#fluo.loader.queue.size=10
-
-# YARN properties
-# ----------------
-# Number of oracle yarn instances
-#fluo.yarn.oracle.instances=1
-
-# Max memory of Oracle yarn containers (in MB)
-#fluo.yarn.oracle.max.memory.mb=512
-
-# Number of oracle virtual cores
-#fluo.yarn.oracle.num.cores=1
-
-# Number of worker yarn instances
-#fluo.yarn.worker.instances=1
-
-# Max memory of worker YARN containers (in MB). If YARN is killing worker
processes consider
-# increasing twill.java.reserved.memory.mb (which defaults to 200 and is set
in yarn-site.xml).
-# The twill.java.reserved.memory.mb config determines the gap between the YARN
memory limit set
-# below and the java -Xmx setting. For example, if max memory is 1024 and
twill reserved memory
-# is 200, the java -Xmx setting will be 1024-200 = 824 MB.
-#fluo.yarn.worker.max.memory.mb=1024
-
-# Number of worker virtual cores
-#fluo.yarn.worker.num.cores=1
-
-#Metrics
-#------------------
-#Configure reporters for metrics. The frequency for each type of reporter is
in seconds.
-
-#fluo.metrics.reporter.console.enable=false
-#fluo.metrics.reporter.console.target=stdout
-#fluo.metrics.reporter.console.rateUnit=seconds
-#fluo.metrics.reporter.console.durationUnit=milliseconds
-#fluo.metrics.reporter.console.frequency=60
-
-#fluo.metrics.reporter.csv.enable=false
-#fluo.metrics.reporter.csv.dir=/tmp/
-#fluo.metrics.reporter.csv.rateUnit=seconds
-#fluo.metrics.reporter.csv.durationUnit=milliseconds
-#fluo.metrics.reporter.csv.frequency=60
-
-#fluo.metrics.reporter.graphite.enable=false
-#fluo.metrics.reporter.graphite.host=carbon.server.com
-#fluo.metrics.reporter.graphite.port=8080
-#fluo.metrics.reporter.graphite.rateUnit=seconds
-#fluo.metrics.reporter.graphite.durationUnit=milliseconds
-#fluo.metrics.reporter.graphite.frequency=60
-#fluo.metrics.reporter.graphite.prefix=
-
-#fluo.metrics.reporter.jmx.enable=false
-#fluo.metrics.reporter.jmx.rateUnit=seconds
-#fluo.metrics.reporter.jmx.durationUnit=milliseconds
-
-#fluo.metrics.reporter.slf4j.enable=false
-#fluo.metrics.reporter.slf4j.logger=metrics
-#fluo.metrics.reporter.slf4j.rateUnit=seconds
-#fluo.metrics.reporter.slf4j.durationUnit=milliseconds
-
-# MiniFluo properties
-# -------------------
-# Path to directory where MiniFluo stores its data
-#fluo.mini.data.dir=${env:FLUO_HOME}/apps/${fluo.client.application.name}/mini
-
-# Indicates if MiniFluo should start a MiniAccumulo cluster
-#fluo.mini.start.accumulo=true
-
-#Application properties
-#---------------
-#Properties with a prefix of fluo.app are stored in zookeeper at
-#initialization time and can easily be retrieved by a Fluo application running
-#on any node in the cluster.
-#fluo.app.config1=val1
diff --git a/modules/distribution/src/main/config/logback.xml
b/modules/distribution/src/main/config/logback.xml
deleted file mode 100644
index fc75138..0000000
--- a/modules/distribution/src/main/config/logback.xml
+++ /dev/null
@@ -1,47 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- agreements. See the NOTICE file distributed with this work for additional
information regarding
- copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
- "License"); you may not use this file except in compliance with the License.
You may obtain a
- copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
distributed under the License
- is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- or implied. See the License for the specific language governing permissions
and limitations under
- the License.
--->
-
-<!--
-This file configures logging for the Fluo oracle and workers (when using
'fluo' & 'local-fluo' commands)
--->
-<configuration>
-
- <appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${fluo.log.dir}/${fluo.log.app}_${fluo.log.host}.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
-
<fileNamePattern>${fluo.log.dir}/${fluo.log.app}_${fluo.log.host}.log.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>10</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.fluo" level="debug"/>
- <logger name="org.apache.zookeeper" level="error"/>
- <logger name="org.apache.curator" level="error"/>
- <logger name="org.apache.twill" level="warn"/>
- <logger name="org.apache.twill.yarn.YarnTwillRunnerService" level="error"/>
-
- <root level="debug">
- <appender-ref ref="FILE" />
- </root>
-</configuration>
diff --git a/modules/distribution/src/main/lib/fetch.sh
b/modules/distribution/src/main/lib/fetch.sh
index 74320a9..ac69f53 100755
--- a/modules/distribution/src/main/lib/fetch.sh
+++ b/modules/distribution/src/main/lib/fetch.sh
@@ -76,32 +76,6 @@ extra)
download log4j:log4j:jar:1.2.17 ./log4j
download org.slf4j:slf4j-log4j12:jar:1.7.12 ./log4j
- download ch.qos.logback:logback-classic:jar:1.2.3 ./logback
- download ch.qos.logback:logback-core:jar:1.2.3 ./logback
- download org.slf4j:log4j-over-slf4j:jar:1.7.12 ./logback
-
- # Jars for deprecated launching in YARN (in Twill)
- download com.101tec:zkclient:jar:0.3 ./twill
- download com.google.code.findbugs:jsr305:jar:2.0.1 ./twill
- download com.yammer.metrics:metrics-annotation:jar:2.2.0 ./twill
- download com.yammer.metrics:metrics-core:jar:2.2.0 ./twill
- download net.sf.jopt-simple:jopt-simple:jar:3.2 ./twill
- download org.apache.kafka:kafka_2.10:jar:0.8.0 ./twill
- download org.apache.twill:twill-api:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-common:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-core:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-discovery-api:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-discovery-core:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-yarn:jar:0.6.0-incubating ./twill
- download org.apache.twill:twill-zookeeper:jar:0.6.0-incubating ./twill
- download org.ow2.asm:asm-all:jar:5.0.2 ./twill
- download org.scala-lang:scala-compiler:jar:2.10.1 ./twill
- download org.scala-lang:scala-library:jar:2.10.1 ./twill
- download org.scala-lang:scala-reflect:jar:2.10.1 ./twill
- download org.xerial.snappy:snappy-java:jar:1.0.5 ./twill
- # See https://github.com/apache/fluo/issues/820
- download io.netty:netty:jar:3.9.9.Final ./twill
-
echo -e "Done!\n"
echo "NOTE - The dependencies downloaded have been tested with some versions
of Hadoop, Zookeeper, and Accumulo."
echo "There is no guarantee they will work with all versions. Fluo chose to
defer dependency resolution to as"
diff --git a/modules/distribution/src/main/scripts/fluo
b/modules/distribution/src/main/scripts/fluo
index 03f6d13..9a9426b 100755
--- a/modules/distribution/src/main/scripts/fluo
+++ b/modules/distribution/src/main/scripts/fluo
@@ -43,8 +43,8 @@ fi
deprecated_fluo_props=$conf/fluo.properties
-if [[ -f "$FLUO_CONN_PROPS" && -f "$deprecated_fluo_props" ]]; then
- echo "Fluo is being configured by $FLUO_CONN_PROPS and
$deprecated_fluo_props. Remove one of these files."
+if [[ -f "$deprecated_fluo_props" ]]; then
+ echo "Fluo no longer supports running applications in YARN using Twill
(Twill does not support Hadoop 3). The presence of the file
$deprecated_fluo_props implies this is desired."
exit 1
fi
@@ -78,14 +78,6 @@ function print_usage {
echo " version Prints the version of Fluo"
echo " wait -a <app> Waits until all notifications are
processed for <app>"
echo " exec <app> <class> {<arg>} Executes <class> with <args> using
classpath for <app>";
-
- echo -e "\nDeprecated commands (available if fluo.properties exists):\n"
- echo " new <app> (Deprecated) Creates configuration for new application
in apps/"
- echo " start <app> (Deprecated) Starts Fluo application on cluster"
- echo " stop <app> (Deprecated) Stops Fluo application on cluster"
- echo " init <app> (Deprecated) Initializes Fluo application using
configuration in apps/<app>/conf/fluo.properties"
- echo " kill <app> (Deprecated) Kills Fluo application on cluster"
- echo " info <app> (Deprecated) Prints information about containers of
Fluo application"
echo " "
exit 1
}
@@ -107,27 +99,6 @@ function verify_app {
fi
}
-function deprecated_verify {
- verify_app "$1"
- APP=$1
- APP_DIR=$basedir/apps/$APP
- APP_CONF_DIR=$APP_DIR/conf
- APP_LIB_DIR=$APP_DIR/lib
- if [ ! -f "$deprecated_fluo_props" ]; then
- echo "ERROR - This command is deprecated can only be used if
fluo.properties exists in $conf"
- exit 1
- fi
-}
-
-function deprecated_verify_full {
- deprecated_verify "$1"
- java org.apache.fluo.cluster.util.ValidateAppName "$APP"
- if [[ ! -d $APP_DIR || ! -d $APP_CONF_DIR || ! -d $APP_LIB_DIR ]]; then
- echo "ERROR - The Fluo '$APP' application needs to be configured in apps/
with a conf/ and lib/ directory. Use 'fluo new $APP' to create this
configuration"
- exit 1
- fi
-}
-
function check_hadoop {
if [[ -z $HADOOP_PREFIX ]]; then
echo "HADOOP_PREFIX needs to be set!"
@@ -167,23 +138,16 @@ get-jars)
$JAVA org.apache.fluo.command.FluoGetJars "${@:2}"
;;
init)
- if [ -f "$FLUO_CONN_PROPS" ]; then
- if [[ $2 = *"-h"* ]]; then
- $JAVA org.apache.fluo.command.FluoInit -h
- exit 0
- fi
- init_dir=$($JAVA org.apache.fluo.command.FluoInit "${@:2}"
--retrieveProperty fluo.observer.init.dir)
- if [ -d "$init_dir" ]; then
- echo "Adding $init_dir/* to CLASSPATH"
- export CLASSPATH="$init_dir/*:$CLASSPATH"
- fi
- $JAVA org.apache.fluo.command.FluoInit "${@:2}"
- else
- deprecated_verify_full "$2"
- check_hadoop
- export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
- java org.apache.fluo.cluster.command.FluoCommand "$basedir"
"$HADOOP_PREFIX" "$@"
+ if [[ $2 = *"-h"* ]]; then
+ $JAVA org.apache.fluo.command.FluoInit -h
+ exit 0
fi
+ init_dir=$($JAVA org.apache.fluo.command.FluoInit "${@:2}"
--retrieveProperty fluo.observer.init.dir)
+ if [ -d "$init_dir" ]; then
+ echo "Adding $init_dir/* to CLASSPATH"
+ export CLASSPATH="$init_dir/*:$CLASSPATH"
+ fi
+ $JAVA org.apache.fluo.command.FluoInit "${@:2}"
;;
remove)
if [[ $2 = *"-h"* ]]; then
@@ -231,77 +195,25 @@ classpath)
echo "$CLASSPATH"
;;
exec)
- if [ -f "$FLUO_CONN_PROPS" ]; then
- app=$2
- verify_app "$app"
- check_conn_props
- # create a temp dir to fetch application jars to
- app_lib=$(mktemp -d "$FLUO_TMP"/fluo-"$app"-XXXXXXXXX) || die "fatal:
unable to allocate a temporary directory"
- # schedule removal of app_lib tmp dir when this script exits
- trap "rm -rf '""$app_lib""'" EXIT HUP INT QUIT TERM
- $JAVA org.apache.fluo.command.FluoGetJars -d "$app_lib" -a "$app"
- export CLASSPATH="$conf:$app_lib/*:$CLASSPATH"
- $JAVA org.apache.fluo.command.FluoExec "${@:2}"
- else
- deprecated_verify "$2"
- export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
- java org.apache.fluo.cluster.command.FluoCommand "$basedir"
"$HADOOP_PREFIX" "$@"
- fi
+ app=$2
+ verify_app "$app"
+ check_conn_props
+ # create a temp dir to fetch application jars to
+ app_lib=$(mktemp -d "$FLUO_TMP"/fluo-"$app"-XXXXXXXXX) || die "fatal: unable
to allocate a temporary directory"
+ # schedule removal of app_lib tmp dir when this script exits
+ trap "rm -rf '""$app_lib""'" EXIT HUP INT QUIT TERM
+ $JAVA org.apache.fluo.command.FluoGetJars -d "$app_lib" -a "$app"
+ export CLASSPATH="$conf:$app_lib/*:$CLASSPATH"
+ $JAVA org.apache.fluo.command.FluoExec "${@:2}"
;;
status)
- if [ -f "$FLUO_CONN_PROPS" ]; then
- $JAVA org.apache.fluo.command.FluoStatus "${@:2}"
- else
- check_hadoop
- java org.apache.fluo.cluster.command.FluoCommand "$basedir"
"$HADOOP_PREFIX" "$@"
- fi
+ $JAVA org.apache.fluo.command.FluoStatus "${@:2}"
;;
version)
echo "$FLUO_VERSION"
;;
wait)
- if [ -f "$FLUO_CONN_PROPS" ]; then
- $JAVA org.apache.fluo.command.FluoWait "${@:2}"
- else
- deprecated_verify "$2"
- export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
- java org.apache.fluo.cluster.command.FluoCommand "$basedir"
"$HADOOP_PREFIX" "$@"
- fi
- ;;
-# Commands below this comment are deprecated
-new)
- deprecated_verify "$2"
- java org.apache.fluo.cluster.util.ValidateAppName "$APP"
- if [ -d "$APP_DIR" ]; then
- echo "The Fluo '$APP' application already has a directory in apps/"
- exit 1
- fi
- mkdir -p "$APP_DIR"
- mkdir -p "$APP_CONF_DIR"
- mkdir -p "$APP_LIB_DIR"
- copy_config fluo.properties
- if [[ "$OSTYPE" == "darwin"* ]]; then
- sed_cmd="sed -i .bak"
- else
- sed_cmd="sed -i"
- fi
- $sed_cmd
"s/fluo.client.application.name=/fluo.client.application.name=$APP/g"
"$APP_CONF_DIR/fluo.properties"
- copy_config logback.xml
- ;;
-start)
- deprecated_verify_full "$2"
- check_hadoop
- export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
- java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX"
"$@"
- ;;
-stop)
- check_hadoop
- java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX"
"$@"
- ;;
-kill|info)
- deprecated_verify "$2"
- check_hadoop
- java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX"
"$@"
+ $JAVA org.apache.fluo.command.FluoWait "${@:2}"
;;
*)
print_usage
diff --git a/modules/integration-tests/pom.xml
b/modules/integration-tests/pom.xml
index c53164e..41cf2a9 100644
--- a/modules/integration-tests/pom.xml
+++ b/modules/integration-tests/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-integration-tests</artifactId>
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
index 70f74be..7dc0c8b 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
@@ -54,8 +54,8 @@ public class ITBase {
protected static FluoConfiguration config;
protected static FluoClient client;
- private static AtomicInteger tableCounter = new AtomicInteger(1);
- protected static AtomicInteger testCounter = new AtomicInteger();
+ private static final AtomicInteger tableCounter = new AtomicInteger(1);
+ protected static final AtomicInteger testCounter = new AtomicInteger();
private static final long JUNIT_TIMEOUT_SECONDS = 120;
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
index 13c316e..6281275 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/FailureIT.java
@@ -17,7 +17,6 @@ package org.apache.fluo.integration.impl;
import java.util.Iterator;
import java.util.Map.Entry;
-import java.util.Random;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -234,7 +233,7 @@ public class FailureIT extends ITBaseImpl {
int bobBal = 10;
int joeBal = 20;
- if ((new Random()).nextBoolean()) {
+ if (Math.random() < .5) {
BankUtil.transfer(env, "joe", "jill", 7);
joeBal -= 7;
} else {
@@ -359,7 +358,8 @@ public class FailureIT extends ITBaseImpl {
// test rolling forward primary and non-primary columns
int bobBal = 3;
int joeBal = 27;
- if ((new Random()).nextBoolean()) {
+
+ if (Math.random() < .5) {
BankUtil.transfer(env, "joe", "jill", 2);
joeBal = 25;
} else {
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/MiniIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/MiniIT.java
index 1739e8a..7c070aa 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/MiniIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/MiniIT.java
@@ -46,7 +46,7 @@ public class MiniIT {
if (dataDir.exists()) {
FileUtils.deleteDirectory(dataDir);
}
- dataDir.mkdirs();
+ Assert.assertTrue(dataDir.mkdirs());
try {
FluoConfiguration config = new FluoConfiguration();
config.setApplicationName("mini");
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/TransactorIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/TransactorIT.java
index c810285..61229d5 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/TransactorIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/TransactorIT.java
@@ -33,9 +33,9 @@ public class TransactorIT extends ITBaseImpl {
@Rule
public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
- public static Long id1 = Long.valueOf(2);
- public static Long id2 = Long.valueOf(3);
- public static long NUM_OPEN_TIMEOUT_MS = 1000;
+ public static final Long id1 = Long.valueOf(2);
+ public static final Long id2 = Long.valueOf(3);
+ public static final long NUM_OPEN_TIMEOUT_MS = 1000;
@Rule
public ExpectedException exception = ExpectedException.none();
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WorkerIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WorkerIT.java
index f4e3450..03f5fcf 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WorkerIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/impl/WorkerIT.java
@@ -51,6 +51,10 @@ public class WorkerIT extends ITBaseMini {
private static Column observedColumn = LAST_UPDATE;
+ private static void setObservedColumn(Column c) {
+ observedColumn = c;
+ }
+
public static class DegreeIndexer implements StringObserver {
@Override
@@ -143,7 +147,7 @@ public class WorkerIT extends ITBaseMini {
*/
@Test
public void testDiffObserverConfig() throws Exception {
- observedColumn = new Column("attr2", "lastupdate");
+ setObservedColumn(new Column("attr2", "lastupdate"));
try {
try (Environment env = new Environment(config);
Observers op = env.getConfiguredObservers().getObservers(env)) {
@@ -156,7 +160,7 @@ public class WorkerIT extends ITBaseMini {
Assert.assertTrue(ise.getMessage()
.contains("Column attr2 lastupdate not previously configured for
strong notifications"));
} finally {
- observedColumn = LAST_UPDATE;
+ setObservedColumn(LAST_UPDATE);
}
}
diff --git a/modules/core/src/main/findbugs/exclude-filter.xml
b/modules/integration-tests/src/main/spotbugs/exclude-filter.xml
similarity index 77%
copy from modules/core/src/main/findbugs/exclude-filter.xml
copy to modules/integration-tests/src/main/spotbugs/exclude-filter.xml
index 0c14e36..9cfeb5c 100644
--- a/modules/core/src/main/findbugs/exclude-filter.xml
+++ b/modules/integration-tests/src/main/spotbugs/exclude-filter.xml
@@ -15,7 +15,9 @@
limitations under the License.
-->
<FindBugsFilter>
- <Match>
- <Class name="~org\.apache\.fluo\.core\.thrift\..*" />
- </Match>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation
bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
</FindBugsFilter>
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index b25e273..ef10ca8 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-mapreduce</artifactId>
diff --git
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
index c103567..aa118a6 100644
---
a/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
+++
b/modules/mapreduce/src/test/java/org/apache/fluo/mapreduce/it/FluoFileOutputFormatIT.java
@@ -68,10 +68,10 @@ public class FluoFileOutputFormatIT extends ITBaseImpl {
public void testImportFile() throws Exception {
File inDir = new File(tempFolder.getRoot(), "in");
- inDir.mkdir();
+ Assert.assertTrue(inDir.mkdir());
File outDir = new File(tempFolder.getRoot(), "out");
File failDir = new File(tempFolder.getRoot(), "fail");
- failDir.mkdir();
+ Assert.assertTrue(failDir.mkdir());
// generate some data for map reduce to read
PrintWriter writer =
diff --git a/modules/mini/pom.xml b/modules/mini/pom.xml
index 5e0fcab..e956e6c 100644
--- a/modules/mini/pom.xml
+++ b/modules/mini/pom.xml
@@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>fluo-mini</artifactId>
diff --git a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
index dc490fa..5a9f0ca 100644
--- a/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
+++ b/modules/mini/src/main/java/org/apache/fluo/mini/MiniFluoImpl.java
@@ -59,8 +59,8 @@ public class MiniFluoImpl implements MiniFluo {
private FluoConfiguration config;
private MiniAccumuloCluster cluster = null;
- protected static String USER = "root";
- protected static String PASSWORD = "secret";
+ protected static final String USER = "root";
+ protected static final String PASSWORD = "secret";
private AutoCloseable reporter;
diff --git a/modules/core/src/main/findbugs/exclude-filter.xml
b/modules/mini/src/main/spotbugs/exclude-filter.xml
similarity index 77%
rename from modules/core/src/main/findbugs/exclude-filter.xml
rename to modules/mini/src/main/spotbugs/exclude-filter.xml
index 0c14e36..9cfeb5c 100644
--- a/modules/core/src/main/findbugs/exclude-filter.xml
+++ b/modules/mini/src/main/spotbugs/exclude-filter.xml
@@ -15,7 +15,9 @@
limitations under the License.
-->
<FindBugsFilter>
- <Match>
- <Class name="~org\.apache\.fluo\.core\.thrift\..*" />
- </Match>
+ <Match>
+ <!-- Must ignore these everywhere, because of a javac byte code generation
bug -->
+ <!-- https://github.com/spotbugs/spotbugs/issues/756 -->
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE" />
+ </Match>
</FindBugsFilter>
diff --git a/pom.xml b/pom.xml
index cadbbd5..e003d77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
<version>3</version>
</parent>
<artifactId>fluo-project</artifactId>
- <version>1.3.0-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Fluo Project</name>
<description>An implementation of Percolator for Apache
Accumulo</description>
@@ -35,7 +35,6 @@
<modules>
<module>modules/accumulo</module>
<module>modules/api</module>
- <module>modules/cluster</module>
<module>modules/command</module>
<module>modules/core</module>
<module>modules/distribution</module>
@@ -57,32 +56,27 @@
<accumulo.version>2.0.0</accumulo.version>
<curator.version>4.0.1</curator.version>
<dropwizard.version>0.8.1</dropwizard.version>
- <findbugs.maxRank>11</findbugs.maxRank>
+ <!-- Prevent findbugs from runnning because it does not work with Java 11
and is configured to run by parent pom. Spotbugs is configured in place of
findbugs. -->
+ <findbugs.skip>true</findbugs.skip>
<hadoop.version>3.1.1</hadoop.version>
- <logback.version>1.2.3</logback.version>
<releaseProfiles>fluo-release</releaseProfiles>
<slf4j.version>1.7.12</slf4j.version>
- <twill.version>0.13.0</twill.version>
+ <spotbugs.version>3.1.12</spotbugs.version>
<zookeeper.version>3.4.8</zookeeper.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>${logback.version}</version>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-core</artifactId>
- <version>${logback.version}</version>
- </dependency>
- <dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.72</version>
</dependency>
<dependency>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-annotations</artifactId>
+ <version>${spotbugs.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
@@ -174,11 +168,6 @@
</dependency>
<dependency>
<groupId>org.apache.fluo</groupId>
- <artifactId>fluo-cluster</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.fluo</groupId>
<artifactId>fluo-command</artifactId>
<version>${project.version}</version>
</dependency>
@@ -213,16 +202,6 @@
<version>${hadoop.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-api</artifactId>
- <version>${twill.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.twill</groupId>
- <artifactId>twill-yarn</artifactId>
- <version>${twill.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
@@ -258,6 +237,26 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <version>${spotbugs.version}.1</version>
+ <configuration>
+ <xmlOutput>true</xmlOutput>
+ <effort>Max</effort>
+ <failOnError>true</failOnError>
+ <includeTests>true</includeTests>
+ <maxRank>16</maxRank>
+
<jvmArgs>-Dcom.overstock.findbugs.ignore=com.google.common.util.concurrent.RateLimiter,com.google.common.hash.Hasher,com.google.common.hash.HashCode,com.google.common.hash.HashFunction,com.google.common.hash.Hashing,com.google.common.cache.Cache,com.google.common.io.CountingOutputStream,com.google.common.io.ByteStreams,com.google.common.cache.LoadingCache,com.google.common.base.Stopwatch,com.google.common.cache.RemovalNotification,com.google.common.util.concurrent.Uninterrupt
[...]
+ <plugins combine.children="append">
+ <plugin>
+ <groupId>com.overstock.findbugs</groupId>
+ <artifactId>library-detectors</artifactId>
+ <version>1.2.0</version>
+ </plugin>
+ </plugins>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo2-maven-plugin</artifactId>
<version>1.0.0</version>
@@ -329,6 +328,19 @@
</pluginManagement>
<plugins>
<plugin>
+ <groupId>com.github.spotbugs</groupId>
+ <artifactId>spotbugs-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>run-spotbugs</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
@@ -357,12 +369,12 @@
<accumulo.skip>true</accumulo.skip>
<apilyzer.skip>true</apilyzer.skip>
<checkstyle.skip>true</checkstyle.skip>
- <findbugs.skip>true</findbugs.skip>
<mdep.analyze.skip>true</mdep.analyze.skip>
<modernizer.skip>true</modernizer.skip>
<rat.skip>true</rat.skip>
<skipITs>true</skipITs>
<skipTests>true</skipTests>
+ <spotbugs.skip>true</spotbugs.skip>
</properties>
</profile>
<profile>
@@ -370,10 +382,10 @@
<!-- some properties to make the release build a bit faster -->
<properties>
<checkstyle.skip>true</checkstyle.skip>
- <findbugs.skip>true</findbugs.skip>
<modernizer.skip>true</modernizer.skip>
<skipITs>true</skipITs>
<skipTests>true</skipTests>
+ <spotbugs.skip>true</spotbugs.skip>
</properties>
</profile>
<profile>
@@ -396,5 +408,16 @@
</pluginManagement>
</build>
</profile>
+ <profile>
+ <id>add-spotbugs-excludes</id>
+ <activation>
+ <file>
+ <exists>src/main/spotbugs/exclude-filter.xml</exists>
+ </file>
+ </activation>
+ <properties>
+
<spotbugs.excludeFilterFile>src/main/spotbugs/exclude-filter.xml</spotbugs.excludeFilterFile>
+ </properties>
+ </profile>
</profiles>
</project>