This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/fluo.git
The following commit(s) were added to refs/heads/master by this push:
new 282b94e Updates for Accumulo 2.0 alpha2 release (#1068)
282b94e is described below
commit 282b94e3f5239f62299ab30c0e558c0004f94f3c
Author: Keith Turner <[email protected]>
AuthorDate: Mon Feb 11 16:35:24 2019 -0500
Updates for Accumulo 2.0 alpha2 release (#1068)
---
.../apache/fluo/accumulo/util/AccumuloProps.java | 3 ++
.../java/org/apache/fluo/command/FluoWait.java | 9 +----
.../org/apache/fluo/core/client/FluoAdminImpl.java | 45 ++++++++++++---------
.../org/apache/fluo/core/impl/Environment.java | 8 ++--
.../org/apache/fluo/core/impl/LockResolver.java | 11 ++----
.../org/apache/fluo/core/impl/TransactionImpl.java | 27 ++++++-------
.../org/apache/fluo/core/util/AccumuloUtil.java | 18 +--------
.../java/org/apache/fluo/core/util/ColumnUtil.java | 46 +++++++++++-----------
.../java/org/apache/fluo/core/util/ScanUtil.java | 16 ++------
.../fluo/core/worker/finder/hash/ScanTask.java | 43 ++++++++++----------
.../java/org/apache/fluo/integration/ITBase.java | 11 ++++--
.../org/apache/fluo/integration/ITBaseImpl.java | 9 +++--
.../org/apache/fluo/integration/ITBaseMini.java | 9 +++--
.../apache/fluo/integration/TestTransaction.java | 38 +++++++++---------
.../fluo/integration/client/FluoAdminImplIT.java | 25 ++++++------
modules/mapreduce/pom.xml | 4 --
.../fluo/mapreduce/FluoEntryInputFormat.java | 7 +++-
.../apache/fluo/mapreduce/FluoRowInputFormat.java | 7 +++-
pom.xml | 2 +-
19 files changed, 163 insertions(+), 175 deletions(-)
diff --git
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
index 9e9c84e..f6585de 100644
---
a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
+++
b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/util/AccumuloProps.java
@@ -17,6 +17,9 @@ package org.apache.fluo.accumulo.util;
public class AccumuloProps {
+ public static final String CLIENT_INSTANCE_NAME = "instance.name";
+ public static final String CLIENT_ZOOKEEPERS = "instance.zookeepers";
+
public static final String TABLE_BLOCKCACHE_ENABLED =
"table.cache.block.enable";
public static final String TABLE_CLASSPATH = "table.classpath.context";
public static final String TABLE_DELETE_BEHAVIOR = "table.delete.behavior";
diff --git
a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
index ce6d459..464a904 100644
--- a/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
+++ b/modules/command/src/main/java/org/apache/fluo/command/FluoWait.java
@@ -49,17 +49,12 @@ public class FluoWait {
private static boolean hasNotifications(Environment env, TableRange range)
throws TableNotFoundException {
- Scanner scanner = null;
- try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations())) {
scanner.setRange(range.getRange());
Notification.configureScanner(scanner);
return scanner.iterator().hasNext();
- } finally {
- if (scanner != null) {
- scanner.close();
- }
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 95a8892..225cd90 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -117,9 +117,14 @@ public class FluoAdminImpl implements FluoAdmin {
"Fluo application already initialized at " +
config.getAppZookeepers());
}
- AccumuloClient conn = AccumuloUtil.getClient(config);
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ initialize(opts, client);
+ }
+ }
- boolean tableExists =
conn.tableOperations().exists(config.getAccumuloTable());
+ private void initialize(InitializationOptions opts, AccumuloClient client)
+ throws TableExistsException, AlreadyInitializedException {
+ boolean tableExists =
client.tableOperations().exists(config.getAccumuloTable());
if (tableExists && !opts.getClearTable()) {
throw new TableExistsException("Accumulo table already exists " +
config.getAccumuloTable());
}
@@ -130,7 +135,7 @@ public class FluoAdminImpl implements FluoAdmin {
logger.info("The Accumulo table '{}' will be dropped and created as
requested by user",
config.getAccumuloTable());
try {
- conn.tableOperations().delete(config.getAccumuloTable());
+ client.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -151,7 +156,7 @@ public class FluoAdminImpl implements FluoAdmin {
}
try {
- initializeApplicationInZooKeeper(conn);
+ initializeApplicationInZooKeeper(client);
String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
@@ -180,7 +185,7 @@ public class FluoAdminImpl implements FluoAdmin {
if (!accumuloClasspath.isEmpty()) {
String contextName = "fluo-" + config.getApplicationName();
- conn.instanceOperations().setProperty(
+ client.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName,
accumuloClasspath);
ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}
@@ -201,7 +206,7 @@ public class FluoAdminImpl implements FluoAdmin {
configureIterators(ntc);
ntc.setProperties(ntcProps);
- conn.tableOperations().create(config.getAccumuloTable(), ntc);
+ client.tableOperations().create(config.getAccumuloTable(), ntc);
updateSharedConfig();
} catch (NodeExistsException nee) {
@@ -246,16 +251,16 @@ public class FluoAdminImpl implements FluoAdmin {
throw new FluoException("Must stop the oracle server to remove an
application");
}
- AccumuloClient conn = AccumuloUtil.getClient(config);
-
- boolean tableExists =
conn.tableOperations().exists(config.getAccumuloTable());
- // With preconditions met, it's now OK to delete table & zookeeper root
(if they exist)
- if (tableExists) {
- logger.info("The Accumulo table '{}' will be dropped",
config.getAccumuloTable());
- try {
- conn.tableOperations().delete(config.getAccumuloTable());
- } catch (Exception e) {
- throw new RuntimeException(e);
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ boolean tableExists =
client.tableOperations().exists(config.getAccumuloTable());
+ // With preconditions met, it's now OK to delete table & zookeeper root
(if they exist)
+ if (tableExists) {
+ logger.info("The Accumulo table '{}' will be dropped",
config.getAccumuloTable());
+ try {
+ client.tableOperations().delete(config.getAccumuloTable());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -276,7 +281,8 @@ public class FluoAdminImpl implements FluoAdmin {
private void initializeApplicationInZooKeeper(AccumuloClient client) throws
Exception {
- final String accumuloInstanceName = client.info().getInstanceName();
+ final String accumuloInstanceName =
+ client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
final String accumuloInstanceID = client.getInstanceID();
final String fluoApplicationID = UUID.randomUUID().toString();
@@ -545,7 +551,8 @@ public class FluoAdminImpl implements FluoAdmin {
if (!config.hasRequiredAdminProps()) {
throw new IllegalArgumentException("Admin configuration is missing
required properties");
}
- AccumuloClient client = AccumuloUtil.getClient(config);
- return client.tableOperations().exists(config.getAccumuloTable());
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ return client.tableOperations().exists(config.getAccumuloTable());
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
index 766a4a9..a5c389f 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Environment.java
@@ -91,9 +91,10 @@ public class Environment implements AutoCloseable {
ensureDeletesAreDisabled();
- if (!client.info().getInstanceName().equals(accumuloInstance)) {
- throw new IllegalArgumentException("unexpected accumulo instance name "
- + client.info().getInstanceName() + " != " + accumuloInstance);
+ String instanceName =
client.properties().getProperty(AccumuloProps.CLIENT_INSTANCE_NAME);
+ if (!instanceName.equals(accumuloInstance)) {
+ throw new IllegalArgumentException(
+ "unexpected accumulo instance name " + instanceName + " != " +
accumuloInstance);
}
if (!client.getInstanceID().equals(accumuloInstanceID)) {
@@ -251,5 +252,6 @@ public class Environment implements AutoCloseable {
@Override
public void close() {
resources.close();
+ client.close();
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
index 51cdd5b..33f8487 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java
@@ -317,10 +317,9 @@ public class LockResolver {
}
}
- BatchScanner bscanner = null;
- try {
- bscanner =
- env.getAccumuloClient().createBatchScanner(env.getTable(),
env.getAuthorizations(), 1);
+
+ try (BatchScanner bscanner =
+ env.getAccumuloClient().createBatchScanner(env.getTable(),
env.getAuthorizations(), 1)) {
bscanner.setRanges(ranges);
IteratorSetting iterCfg = new IteratorSetting(10,
OpenReadLockIterator.class);
@@ -336,10 +335,6 @@ public class LockResolver {
return ret;
- } finally {
- if (bscanner != null) {
- bscanner.close();
- }
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
index 99551d0..b898492 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@@ -674,25 +674,22 @@ public class TransactionImpl extends
AbstractTransactionBase implements AsyncTra
Range range = new Range(startKey, endKey);
- Scanner scanner;
- try {
- // TODO reuse or share scanner
- scanner =
- env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations())) {
+ scanner.setRange(range);
+
+ // TODO could use iterator that stops after 1st ACK. thought of
using versioning iter
+ // but
+ // it scans to ACK
+ if (scanner.iterator().hasNext()) {
+ env.getSharedResources().getBatchWriter()
+ .writeMutationAsync(notification.newDelete(env));
+ return true;
+ }
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
-
- scanner.setRange(range);
-
- // TODO could use iterator that stops after 1st ACK. thought of
using versioning iter but
- // it scans to ACK
- if (scanner.iterator().hasNext()) {
- env.getSharedResources().getBatchWriter()
- .writeMutationAsync(notification.newDelete(env));
- return true;
- }
}
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
index 3257bd5..b6eaeb6 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/AccumuloUtil.java
@@ -17,9 +17,6 @@ package org.apache.fluo.core.util;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientInfo;
import org.apache.fluo.api.config.FluoConfiguration;
/**
@@ -32,18 +29,7 @@ public class AccumuloUtil {
* Creates Accumulo connector given FluoConfiguration
*/
public static AccumuloClient getClient(FluoConfiguration config) {
- try {
- return Accumulo.newClient()
- .forInstance(config.getAccumuloInstance(),
config.getAccumuloZookeepers())
- .usingPassword(config.getAccumuloUser(),
config.getAccumuloPassword()).build();
- } catch (AccumuloException | AccumuloSecurityException e) {
- throw new IllegalStateException(e);
- }
- }
-
- public static ClientInfo getClientInfo(FluoConfiguration config) {
- return Accumulo.newClient()
- .forInstance(config.getAccumuloInstance(),
config.getAccumuloZookeepers())
- .usingPassword(config.getAccumuloUser(),
config.getAccumuloPassword()).info();
+ return Accumulo.newClient().to(config.getAccumuloInstance(),
config.getAccumuloZookeepers())
+ .as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
}
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
index bf32a61..51e4f15 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ColumnUtil.java
@@ -75,34 +75,32 @@ public class ColumnUtil {
Column col) {
Span span = Span.exact(row, col);
- Scanner scanner;
- try {
- // TODO reuse or share scanner
- scanner = env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations())) {
+ scanner.setRange(SpanUtil.toRange(span));
+ scanner.addScanIterator(iterConf);
+
+ Iterator<Entry<Key, Value>> iter = scanner.iterator();
+ if (iter.hasNext()) {
+ Entry<Key, Value> entry = iter.next();
+
+ Key k = entry.getKey();
+ Bytes r = Bytes.of(k.getRowData().toArray());
+ Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
+ Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
+ Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
+
+ if (r.equals(row) && cf.equals(col.getFamily()) &&
cq.equals(col.getQualifier())
+ && cv.equals(col.getVisibility())) {
+ return entry;
+ } else {
+ throw new RuntimeException("unexpected key " + k + " " + row + " " +
col);
+ }
+ }
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
- scanner.setRange(SpanUtil.toRange(span));
- scanner.addScanIterator(iterConf);
-
- Iterator<Entry<Key, Value>> iter = scanner.iterator();
- if (iter.hasNext()) {
- Entry<Key, Value> entry = iter.next();
-
- Key k = entry.getKey();
- Bytes r = Bytes.of(k.getRowData().toArray());
- Bytes cf = Bytes.of(k.getColumnFamilyData().toArray());
- Bytes cq = Bytes.of(k.getColumnQualifierData().toArray());
- Bytes cv = Bytes.of(k.getColumnVisibilityData().toArray());
-
- if (r.equals(row) && cf.equals(col.getFamily()) &&
cq.equals(col.getQualifier())
- && cv.equals(col.getVisibility())) {
- return entry;
- } else {
- throw new RuntimeException("unexpected key " + k + " " + row + " " +
col);
- }
- }
return null;
}
diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
index 2c66cbf..7628274 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/util/ScanUtil.java
@@ -162,10 +162,7 @@ public class ScanUtil {
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
- Scanner scanner = null;
- try {
- scanner = client.createScanner(sConfig.getAccumuloTable(),
Authorizations.EMPTY);
-
+ try (Scanner scanner = client.createScanner(sConfig.getAccumuloTable(),
Authorizations.EMPTY)) {
scanner.setRange(SpanUtil.toRange(span));
NotificationScanner ntfyScanner = new NotificationScanner(scanner,
columns);
@@ -173,10 +170,6 @@ public class ScanUtil {
scan(options, out, ntfyScanner);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
- } finally {
- if (scanner != null) {
- scanner.close();
- }
}
}
@@ -210,13 +203,12 @@ public class ScanUtil {
public static void scanAccumulo(ScanOpts options, FluoConfiguration sConfig,
PrintStream out) {
- AccumuloClient client = AccumuloUtil.getClient(sConfig);
-
Span span = getSpan(options);
Collection<Column> columns = getColumns(options);
- try {
- Scanner scanner = client.createScanner(sConfig.getAccumuloTable(),
Authorizations.EMPTY);
+ try (AccumuloClient client = AccumuloUtil.getClient(sConfig);
+ Scanner scanner = client.createScanner(sConfig.getAccumuloTable(),
Authorizations.EMPTY)) {
+
scanner.setRange(SpanUtil.toRange(span));
for (Column col : columns) {
if (col.isQualifierSet()) {
diff --git
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
index ffe9194..1a47f2a 100644
---
a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
+++
b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/ScanTask.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -180,34 +180,35 @@ public class ScanTask implements Runnable {
private ScanCounts scan(Session session, PartitionInfo pi, Range range)
throws TableNotFoundException {
- Scanner scanner =
- env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations())) {
- scanner.setRange(range);
+ scanner.setRange(range);
- Notification.configureScanner(scanner);
+ Notification.configureScanner(scanner);
- IteratorSetting iterCfg = new IteratorSetting(30, "nhf",
NotificationHashFilter.class);
- NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(),
pi.getMyIdInGroup());
- scanner.addScanIterator(iterCfg);
+ IteratorSetting iterCfg = new IteratorSetting(30, "nhf",
NotificationHashFilter.class);
+ NotificationHashFilter.setModulusParams(iterCfg, pi.getMyGroupSize(),
pi.getMyIdInGroup());
+ scanner.addScanIterator(iterCfg);
- ScanCounts counts = new ScanCounts();
+ ScanCounts counts = new ScanCounts();
- for (Entry<Key, Value> entry : scanner) {
- if (!pi.equals(partitionManager.getPartitionInfo())) {
- throw new PartitionInfoChangedException();
- }
+ for (Entry<Key, Value> entry : scanner) {
+ if (!pi.equals(partitionManager.getPartitionInfo())) {
+ throw new PartitionInfoChangedException();
+ }
- if (stopped.get()) {
- return counts;
- }
+ if (stopped.get()) {
+ return counts;
+ }
- counts.seen++;
+ counts.seen++;
- if (session.addNotification(finder, Notification.from(entry.getKey()))) {
- counts.added++;
+ if (session.addNotification(finder,
Notification.from(entry.getKey()))) {
+ counts.added++;
+ }
}
+ return counts;
}
- return counts;
}
}
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
index 92436f4..3bb62e3 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBase.java
@@ -16,12 +16,12 @@
package org.apache.fluo.integration;
import java.io.File;
+import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.ClientInfo;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.commons.io.FileUtils;
@@ -48,7 +48,6 @@ public class ITBase {
protected static String instanceName;
protected static AccumuloClient aClient;
- protected static ClientInfo clientInfo;
private static MiniAccumuloCluster cluster;
private static boolean startedCluster = false;
@@ -87,8 +86,8 @@ public class ITBase {
cluster.start();
startedCluster = true;
}
- clientInfo = MiniAccumuloCluster.getClientInfo(instanceDir);
- aClient = Accumulo.newClient().usingClientInfo(clientInfo).build();
+ Properties props = MiniAccumuloCluster.getClientProperties(instanceDir);
+ aClient = Accumulo.newClient().from(props).build();
}
protected Class<? extends ObserverProvider> getObserverProviderClass() {
@@ -128,6 +127,10 @@ public class ITBase {
@AfterClass
public static void tearDownAccumulo() throws Exception {
+ if (aClient != null) {
+ aClient.close();
+ }
+
if (startedCluster) {
cluster.stop();
}
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
index 43bc869..1f77d3f 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseImpl.java
@@ -15,8 +15,10 @@
package org.apache.fluo.integration;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
import org.apache.fluo.api.client.FluoFactory;
@@ -66,14 +68,15 @@ public class ITBaseImpl extends ITBase {
table = getNextTableName();
+ Properties props = aClient.properties();
config = new FluoConfiguration();
config.setApplicationName("impl-test" + testCounter.getAndIncrement());
- config.setAccumuloInstance(clientInfo.getInstanceName());
+
config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
config.setAccumuloUser(USER);
config.setAccumuloPassword(PASSWORD);
config.setAccumuloTable(table);
- config.setAccumuloZookeepers(clientInfo.getZooKeepers());
- config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+
config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+
config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS)
+ "/fluo");
config.setTransactionRollbackTime(1, TimeUnit.SECONDS);
setupObservers(config);
config.setProperty(FluoConfigurationImpl.ZK_UPDATE_PERIOD_PROP, "1000");
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
index d31bf9f..f516a10 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/ITBaseMini.java
@@ -15,8 +15,10 @@
package org.apache.fluo.integration;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.fluo.accumulo.util.AccumuloProps;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoAdmin.InitializationOptions;
import org.apache.fluo.api.client.FluoFactory;
@@ -40,13 +42,14 @@ public class ITBaseMini extends ITBase {
@Before
public void setUpFluo() throws Exception {
+ Properties props = aClient.properties();
config = new FluoConfiguration();
config.setApplicationName("mini-test" + testCounter.getAndIncrement());
- config.setAccumuloInstance(clientInfo.getInstanceName());
+
config.setAccumuloInstance(props.getProperty(AccumuloProps.CLIENT_INSTANCE_NAME));
config.setAccumuloUser(USER);
config.setAccumuloPassword(PASSWORD);
- config.setAccumuloZookeepers(clientInfo.getZooKeepers());
- config.setInstanceZookeepers(clientInfo.getZooKeepers() + "/fluo");
+
config.setAccumuloZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS));
+
config.setInstanceZookeepers(props.getProperty(AccumuloProps.CLIENT_ZOOKEEPERS)
+ "/fluo");
config.setAccumuloTable(getNextTableName());
config.setWorkerThreads(5);
setupObservers(config);
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
index ebb02d1..7d62f48 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/TestTransaction.java
@@ -4,9 +4,9 @@
* copyright ownership. The ASF licenses this file to you under the Apache
License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
License. You may obtain a
* copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
* or implied. See the License for the specific language governing permissions
and limitations under
@@ -55,28 +55,28 @@ public class TestTransaction extends
AbstractTransactionBase implements Transact
private Environment env;
public static long getNotificationTS(Environment env, String row, Column
col) {
- Scanner scanner;
- try {
- scanner = env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations());
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
- }
- IteratorSetting iterCfg = new IteratorSetting(11,
NotificationIterator.class);
- scanner.addScanIterator(iterCfg);
+ try (Scanner scanner =
+ env.getAccumuloClient().createScanner(env.getTable(),
env.getAuthorizations())) {
+
+ IteratorSetting iterCfg = new IteratorSetting(11,
NotificationIterator.class);
+ scanner.addScanIterator(iterCfg);
- Text cv = ByteUtil.toText(col.getVisibility());
+ Text cv = ByteUtil.toText(col.getVisibility());
- scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
- scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
- new Text(NotificationUtil.encodeCol(col)));
+ scanner.setRange(SpanUtil.toRange(Span.prefix(row)));
+ scanner.fetchColumn(ByteUtil.toText(ColumnConstants.NOTIFY_CF),
+ new Text(NotificationUtil.encodeCol(col)));
- for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
- if (entry.getKey().getColumnVisibility().equals(cv)) {
- return Notification.from(entry.getKey()).getTimestamp();
+ for (Entry<Key, org.apache.accumulo.core.data.Value> entry : scanner) {
+ if (entry.getKey().getColumnVisibility().equals(cv)) {
+ return Notification.from(entry.getKey()).getTimestamp();
+ }
}
- }
- throw new RuntimeException("No notification found");
+ throw new RuntimeException("No notification found");
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
@SuppressWarnings("resource")
diff --git
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index a9222b0..87996e7 100644
---
a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++
b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -114,18 +114,19 @@ public class FluoAdminImplIT extends ITBaseImpl {
admin.initialize(opts);
// verify locality groups were set on the table
- AccumuloClient client = AccumuloUtil.getClient(config);
- Map<String, Set<Text>> localityGroups =
-
client.tableOperations().getLocalityGroups(config.getAccumuloTable());
- Assert.assertEquals("Unexpected locality group count.", 1,
localityGroups.size());
- Entry<String, Set<Text>> localityGroup =
localityGroups.entrySet().iterator().next();
- Assert.assertEquals("'notify' locality group not found.",
- ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME, localityGroup.getKey());
- Assert.assertEquals("'notify' locality group does not contain exactly 1
column family.", 1,
- localityGroup.getValue().size());
- Text colFam = localityGroup.getValue().iterator().next();
- Assert.assertTrue("'notify' locality group does not contain the correct
column family.",
- ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0,
colFam.getLength()));
+ try (AccumuloClient client = AccumuloUtil.getClient(config)) {
+ Map<String, Set<Text>> localityGroups =
+
client.tableOperations().getLocalityGroups(config.getAccumuloTable());
+ Assert.assertEquals("Unexpected locality group count.", 1,
localityGroups.size());
+ Entry<String, Set<Text>> localityGroup =
localityGroups.entrySet().iterator().next();
+ Assert.assertEquals("'notify' locality group not found.",
+ ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
localityGroup.getKey());
+ Assert.assertEquals("'notify' locality group does not contain exactly
1 column family.", 1,
+ localityGroup.getValue().size());
+ Text colFam = localityGroup.getValue().iterator().next();
+ Assert.assertTrue("'notify' locality group does not contain the
correct column family.",
+ ColumnConstants.NOTIFY_CF.contentEquals(colFam.getBytes(), 0,
colFam.getLength()));
+ }
}
try (FluoClientImpl client = new FluoClientImpl(localConfig)) {
diff --git a/modules/mapreduce/pom.xml b/modules/mapreduce/pom.xml
index a6d95d7..c4a1339 100644
--- a/modules/mapreduce/pom.xml
+++ b/modules/mapreduce/pom.xml
@@ -28,10 +28,6 @@
<dependencies>
<dependency>
<groupId>org.apache.accumulo</groupId>
- <artifactId>accumulo-client-mapreduce</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
index e09b86f..9d58a86 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
@@ -34,7 +35,6 @@ import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -154,7 +154,10 @@ public class FluoEntryInputFormat extends
InputFormat<RowColumn, Bytes> {
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setClientInfo(conf,
AccumuloUtil.getClientInfo(fconfig));
+ AccumuloInputFormat.setZooKeeperInstance(conf,
fconfig.getAccumuloInstance(),
+ fconfig.getAccumuloZookeepers());
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf,
env.getAuthorizations());
}
diff --git
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
index 8993f38..1c5616d 100644
---
a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
+++
b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoRowInputFormat.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
@@ -34,7 +35,6 @@ import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
-import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.SpanUtil;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -155,7 +155,10 @@ public class FluoRowInputFormat extends InputFormat<Bytes,
Iterator<ColumnValue>
conf.getConfiguration().set(PROPS_CONF_KEY,
new String(baos.toByteArray(), StandardCharsets.UTF_8));
- AccumuloInputFormat.setClientInfo(conf,
AccumuloUtil.getClientInfo(fconfig));
+ AccumuloInputFormat.setZooKeeperInstance(conf,
fconfig.getAccumuloInstance(),
+ fconfig.getAccumuloZookeepers());
+ AccumuloInputFormat.setConnectorInfo(conf, fconfig.getAccumuloUser(),
+ new PasswordToken(fconfig.getAccumuloPassword()));
AccumuloInputFormat.setInputTableName(conf, env.getTable());
AccumuloInputFormat.setScanAuthorizations(conf,
env.getAuthorizations());
}
diff --git a/pom.xml b/pom.xml
index 2dfa651..58abc63 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<url>https://github.com/apache/fluo/issues</url>
</issueManagement>
<properties>
- <accumulo.version>2.0.0-alpha-1</accumulo.version>
+ <accumulo.version>2.0.0-alpha-2</accumulo.version>
<curator.version>4.0.1</curator.version>
<dropwizard.version>0.8.1</dropwizard.version>
<findbugs.maxRank>11</findbugs.maxRank>