This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/fluo-recipes.git
commit 91ac935e48f1c88da7a0b1bdf952d0c9eff2ff9c Author: Christopher Tubbs <[email protected]> AuthorDate: Mon Nov 27 17:42:47 2017 -0500 Apply formatter changes from parent POM 3 --- .../recipes/accumulo/cmds/CompactTransient.java | 4 +- .../recipes/accumulo/export/AccumuloExporter.java | 9 +- .../accumulo/export/function/AccumuloExporter.java | 5 +- .../accumulo/export/function/AccumuloWriter.java | 5 +- .../fluo/recipes/accumulo/ops/TableOperations.java | 16 +-- .../recipes/core/combine/CombineQueueImpl.java | 20 ++- .../fluo/recipes/core/combine/CqConfigurator.java | 4 +- .../fluo/recipes/core/combine/CqOptimizer.java | 3 +- .../fluo/recipes/core/combine/InputImpl.java | 3 +- .../recipes/core/common/TransientRegistry.java | 5 +- .../fluo/recipes/core/export/ExportBucket.java | 15 +-- .../fluo/recipes/core/export/ExportObserver.java | 5 +- .../recipes/core/export/ExportObserverImpl.java | 8 +- .../fluo/recipes/core/export/ExportQueue.java | 20 ++- .../recipes/core/export/FluentConfigurator.java | 8 +- .../fluo/recipes/core/map/CollisionFreeMap.java | 17 ++- .../org/apache/fluo/recipes/core/map/Update.java | 4 +- .../core/serialization/SimpleSerializer.java | 10 +- .../fluo/recipes/core/transaction/LogEntry.java | 4 +- .../fluo/recipes/core/types/TypedSnapshotBase.java | 4 +- .../fluo/recipes/core/combine/SplitsTest.java | 12 +- .../core/combine/it/CombineQueueTreeIT.java | 8 +- .../fluo/recipes/core/data/RowHasherTest.java | 5 +- .../fluo/recipes/core/export/OptionsTest.java | 10 +- .../recipes/core/export/it/ExportTestBase.java | 4 +- .../apache/fluo/recipes/core/map/OptionsTest.java | 9 +- .../apache/fluo/recipes/core/map/SplitsTest.java | 22 ++- .../fluo/recipes/core/map/it/BigUpdateIT.java | 32 ++--- .../recipes/core/map/it/CollisionFreeMapIT.java | 9 +- .../fluo/recipes/core/map/it/DocumentObserver.java | 9 +- .../recipes/core/map/it/WordCountObserver.java | 4 +- .../core/transaction/RecordingTransactionTest.java | 52 +++---- .../fluo/recipes/core/types/TypeLayerTest.java | 150 +++++++++------------ .../fluo/recipes/kryo/KryoSimplerSerializer.java | 4 +- .../apache/fluo/recipes/spark/FluoSparkHelper.java | 12 +- .../fluo/recipes/spark/it/FluoSparkHelperIT.java | 8 +- .../org/apache/fluo/recipes/test/FluoITHelper.java | 17 ++- .../recipes/test/export/AccumuloExporterIT.java | 20 +-- 38 files changed, 251 insertions(+), 305 deletions(-) diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java index eeaf693..f1e11df 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/cmds/CompactTransient.java @@ -85,8 +85,8 @@ public class CompactTransient { public static void main(String[] args) throws Exception { if ((args.length == 1 && args[0].startsWith("-h")) || (args.length > 2)) { - System.out.println("Usage : " + CompactTransient.class.getName() - + " [<interval> [<multiplier>]]"); + System.out + .println("Usage : " + CompactTransient.class.getName() + " [<interval> [<multiplier>]]"); System.exit(-1); } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java index 7ed11e0..cda3497 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/AccumuloExporter.java @@ -38,8 +38,8 @@ import org.apache.fluo.recipes.core.export.SequencedExport; * {@link AccumuloTranslator} */ @Deprecated -public abstract class AccumuloExporter<K, V> extends - org.apache.fluo.recipes.core.export.Exporter<K, V> { +public abstract class AccumuloExporter<K, V> + extends org.apache.fluo.recipes.core.export.Exporter<K, V> { /** * Use this to configure the Accumulo table where an AccumuloExporter's mutations will be written. @@ -76,9 +76,8 @@ public abstract class AccumuloExporter<K, V> extends FluoConfiguration tmpFc = new FluoConfiguration(); org.apache.fluo.recipes.accumulo.export.function.AccumuloExporter.configure("aecfgid") .instance(instanceName, zookeepers).credentials(user, password).table(table).save(tmpFc); - accumuloWriter = - new org.apache.fluo.recipes.accumulo.export.function.AccumuloExporter<K, V>("aecfgid", - tmpFc.getAppConfiguration(), this::translate); + accumuloWriter = new org.apache.fluo.recipes.accumulo.export.function.AccumuloExporter<K, V>( + "aecfgid", tmpFc.getAppConfiguration(), this::translate); } @Override diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloExporter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloExporter.java index 1da0560..8a46ce2 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloExporter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloExporter.java @@ -82,9 +82,8 @@ public class AccumuloExporter<K, V> implements Exporter<K, V> { public AccumuloExporter(String configId, SimpleConfiguration appConfig, AccumuloTranslator<K, V> translator) { AeFluentConfigurator cfg = AeFluentConfigurator.load(configId, appConfig); - this.writer = - AccumuloWriter.getInstance(cfg.getInstance(), cfg.getZookeepers(), cfg.getUser(), - cfg.getPassword(), cfg.getTable()); + this.writer = AccumuloWriter.getInstance(cfg.getInstance(), cfg.getZookeepers(), cfg.getUser(), + cfg.getPassword(), cfg.getTable()); this.translator = translator; } diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java index 0c7fc85..23e9991 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java @@ -59,9 +59,8 @@ class AccumuloWriter { ExportTask(String instanceName, String zookeepers, String user, String password, String table) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - ZooKeeperInstance zki = - new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts( - zookeepers)); + ZooKeeperInstance zki = new ZooKeeperInstance( + new ClientConfiguration().withInstance(instanceName).withZkHosts(zookeepers)); // TODO need to close batch writer Connector conn = zki.getConnector(user, new PasswordToken(password)); diff --git a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java index 5aaeb57..5dea9e3 100644 --- a/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java +++ b/modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/ops/TableOperations.java @@ -52,13 +52,12 @@ public class TableOperations { private static Connector getConnector(FluoConfiguration fluoConfig) throws Exception { - ZooKeeperInstance zki = - new ZooKeeperInstance(new ClientConfiguration().withInstance( - fluoConfig.getAccumuloInstance()).withZkHosts(fluoConfig.getAccumuloZookeepers())); + ZooKeeperInstance zki = new ZooKeeperInstance( + new ClientConfiguration().withInstance(fluoConfig.getAccumuloInstance()) + .withZkHosts(fluoConfig.getAccumuloZookeepers())); - Connector conn = - zki.getConnector(fluoConfig.getAccumuloUser(), - new PasswordToken(fluoConfig.getAccumuloPassword())); + Connector conn = zki.getConnector(fluoConfig.getAccumuloUser(), + new PasswordToken(fluoConfig.getAccumuloPassword())); return conn; } @@ -94,9 +93,8 @@ public class TableOperations { conn.tableOperations().setProperty(table, RGB_DEFAULT_PROP, "none"); conn.tableOperations().setProperty(table, TABLE_BALANCER_PROP, RGB_CLASS); } catch (AccumuloException e) { - logger - .warn("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X) : " - + e.getMessage()); + logger.warn("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X) : " + + e.getMessage()); logger.debug("Unable to setup regex balancer (this is expected to fail in Accumulo 1.6.X)", e); } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java index 48d2651..94941bf 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CombineQueueImpl.java @@ -66,12 +66,10 @@ class CombineQueueImpl<K, V> implements CombineQueue<K, V> { this.updatePrefix = Bytes.of(cqId + ":u:"); this.dataPrefix = Bytes.of(cqId + ":d:"); this.notifyColumn = new Column("fluoRecipes", "cfm:" + cqId); - this.keyType = - (Class<K>) getClass().getClassLoader() - .loadClass(CqConfigurator.getKeyType(cqId, appConfig)); - this.valType = - (Class<V>) getClass().getClassLoader().loadClass( - CqConfigurator.getValueType(cqId, appConfig)); + this.keyType = (Class<K>) getClass().getClassLoader() + .loadClass(CqConfigurator.getKeyType(cqId, appConfig)); + this.valType = (Class<V>) getClass().getClassLoader() + .loadClass(CqConfigurator.getValueType(cqId, appConfig)); this.numBuckets = CqConfigurator.getNumBucket(cqId, appConfig); this.bufferSize = CqConfigurator.getBufferSize(cqId, appConfig); this.serializer = SimpleSerializer.getInstance(appConfig); @@ -177,13 +175,11 @@ class CombineQueueImpl<K, V> implements CombineQueue<K, V> { Span span; if (nextKey != null) { - Bytes startRow = - Bytes.builder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey) - .toBytes(); + Bytes startRow = Bytes.builder(ntfyRow.length() + nextKey.length()).append(ntfyRow) + .append(nextKey).toBytes(); Span tmpSpan = Span.prefix(ntfyRow); - Span nextSpan = - new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(), - tmpSpan.isEndInclusive()); + Span nextSpan = new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(), + tmpSpan.isEndInclusive()); span = nextSpan; } else { span = Span.prefix(ntfyRow); diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqConfigurator.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqConfigurator.java index 40c5b37..de3577a 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqConfigurator.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqConfigurator.java @@ -98,8 +98,8 @@ class CqConfigurator implements FluentArg1, FluentArg2, FluentArg3, FluentOption @Override public FluentOptions bucketsPerTablet(int bucketsPerTablet) { - Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " - + bucketsPerTablet); + Preconditions.checkArgument(bucketsPerTablet > 0, + "bucketsPerTablet is <= 0 : " + bucketsPerTablet); this.bucketsPerTablet = bucketsPerTablet; return this; } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqOptimizer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqOptimizer.java index b9e02d6..b918757 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqOptimizer.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/CqOptimizer.java @@ -28,7 +28,8 @@ import org.apache.fluo.recipes.core.common.TableOptimizations; // This class intentionally package private. class CqOptimizer { - public static TableOptimizations getTableOptimizations(String cqId, SimpleConfiguration appConfig) { + public static TableOptimizations getTableOptimizations(String cqId, + SimpleConfiguration appConfig) { int numBuckets = CqConfigurator.getNumBucket(cqId, appConfig); int bpt = CqConfigurator.getBucketsPerTablet(cqId, appConfig); diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/InputImpl.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/InputImpl.java index aaca954..a5e8f9f 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/InputImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/combine/InputImpl.java @@ -36,7 +36,8 @@ class InputImpl<K, V> implements Combiner.Input<K, V> { this.valuesCollection = serializedValues; } - InputImpl(K k, Function<Bytes, V> valDeser, Bytes currentValue, Collection<Bytes> serializedValues) { + InputImpl(K k, Function<Bytes, V> valDeser, Bytes currentValue, + Collection<Bytes> serializedValues) { this(k, valDeser, serializedValues); this.currentValue = currentValue; } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java index dc3922f..53106a5 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/common/TransientRegistry.java @@ -70,9 +70,8 @@ public class TransientRegistry { String key = keys.next(); String val = appConfig.getString(key); String[] sa = val.split(":"); - RowRange rowRange = - new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])), - Bytes.of(DatatypeConverter.parseHexBinary(sa[1]))); + RowRange rowRange = new RowRange(Bytes.of(DatatypeConverter.parseHexBinary(sa[0])), + Bytes.of(DatatypeConverter.parseHexBinary(sa[1]))); ranges.add(rowRange); } return ranges; diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java index 7292ff9..68c8c8c 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportBucket.java @@ -111,9 +111,8 @@ class ExportBucket { } public void add(long seq, byte[] key, byte[] value) { - BytesBuilder builder = - Bytes.builder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(':') - .append(key); + BytesBuilder builder = Bytes.builder(bucketRow.length() + 1 + key.length + 8).append(bucketRow) + .append(':').append(key); encSeq(builder, seq); ttx.set(builder.toBytes(), EXPORT_COL, Bytes.of(value)); } @@ -133,9 +132,8 @@ class ExportBucket { Span span; if (continueRow != null) { Span tmpSpan = Span.prefix(bucketRow); - Span nextSpan = - new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(), - tmpSpan.isEndInclusive()); + Span nextSpan = new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(), + tmpSpan.isEndInclusive()); span = nextSpan; } else { span = Span.prefix(bucketRow); @@ -191,9 +189,8 @@ class ExportBucket { } public void setContinueRow(ExportEntry ee) { - BytesBuilder builder = - Bytes.builder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(':') - .append(ee.key); + BytesBuilder builder = Bytes.builder(bucketRow.length() + 1 + ee.key.length + 8) + .append(bucketRow).append(':').append(ee.key); encSeq(builder, ee.seq); Bytes nextRow = builder.toBytes(); ttx.set(getMinimalRow(), NEXT_COL, nextRow); diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java index 6cfa3a3..f9efb08 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserver.java @@ -46,9 +46,8 @@ public class ExportObserver<K, V> extends org.apache.fluo.api.observer.AbstractO // TODO defer loading classes... so that not done during fluo init // TODO move class loading to centralized place... also attempt to check type params @SuppressWarnings("rawtypes") - Exporter exporter = - getClass().getClassLoader().loadClass(opts.fluentCfg.exporterType) - .asSubclass(Exporter.class).newInstance(); + Exporter exporter = getClass().getClassLoader().loadClass(opts.fluentCfg.exporterType) + .asSubclass(Exporter.class).newInstance(); SimpleSerializer serializer = SimpleSerializer.getInstance(context.getAppConfiguration()); diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java index a95125d..4685dfb 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportObserverImpl.java @@ -58,11 +58,9 @@ class ExportObserverImpl<K, V> implements Observer { Iterator<ExportEntry> input = bucket.getExportIterator(continueRow); MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length()); - Iterator<SequencedExport<K, V>> exportIterator = - Iterators.transform( - memLimitIter, - ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer - .deserialize(ee.value, valType), ee.seq)); + Iterator<SequencedExport<K, V>> exportIterator = Iterators.transform(memLimitIter, + ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), + serializer.deserialize(ee.value, valType), ee.seq)); exportIterator = Iterators.consumingIterator(exportIterator); diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java index 0ee23dd..b79c080 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/ExportQueue.java @@ -190,9 +190,9 @@ public class ExportQueue<K, V> { SimpleConfiguration appConfig = fluoConfig.getAppConfiguration(); opts.save(appConfig); - fluoConfig - .addObserver(new org.apache.fluo.api.config.ObserverSpecification(ExportObserver.class - .getName(), Collections.singletonMap("queueId", opts.fluentCfg.queueId))); + fluoConfig.addObserver( + new org.apache.fluo.api.config.ObserverSpecification(ExportObserver.class.getName(), + Collections.singletonMap("queueId", opts.fluentCfg.queueId))); } /** @@ -246,11 +246,10 @@ public class ExportQueue<K, V> { */ public void registerObserver(ObserverProvider.Registry obsRegistry, org.apache.fluo.recipes.core.export.function.Exporter<K, V> exporter) { - Preconditions - .checkState( - opts.exporterType == null, - "Expected exporter type not be set, it was set to %s. Cannot not use the old and new way of configuring " - + "exporters at the same time.", opts.exporterType); + Preconditions.checkState(opts.exporterType == null, + "Expected exporter type not be set, it was set to %s. Cannot not use the old and new way of configuring " + + "exporters at the same time.", + opts.exporterType); Observer obs; try { obs = new ExportObserverImpl<K, V>(queueId, opts, serializer, exporter); @@ -292,9 +291,8 @@ public class ExportQueue<K, V> { // intentionally package private Options(String queueId, String keyType, String valueType, int buckets) { Preconditions.checkArgument(buckets > 0); - this.fluentCfg = - (FluentConfigurator) new FluentConfigurator(queueId).keyType(keyType) - .valueType(valueType).buckets(buckets); + this.fluentCfg = (FluentConfigurator) new FluentConfigurator(queueId).keyType(keyType) + .valueType(valueType).buckets(buckets); } /** diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/FluentConfigurator.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/FluentConfigurator.java index 718ec66..c2ae2e7 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/export/FluentConfigurator.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/export/FluentConfigurator.java @@ -58,8 +58,8 @@ class FluentConfigurator implements FluentArg1, FluentArg2, FluentArg3, FluentOp @Override public FluentOptions bucketsPerTablet(int bucketsPerTablet) { - Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " - + bucketsPerTablet); + Preconditions.checkArgument(bucketsPerTablet > 0, + "bucketsPerTablet is <= 0 : " + bucketsPerTablet); this.bucketsPerTablet = bucketsPerTablet; return this; } @@ -84,8 +84,8 @@ class FluentConfigurator implements FluentArg1, FluentArg2, FluentArg3, FluentOp Bytes exportRangeStart = Bytes.of(queueId + ExportQueue.RANGE_BEGIN); Bytes exportRangeStop = Bytes.of(queueId + ExportQueue.RANGE_END); - new TransientRegistry(appConfig).addTransientRange("exportQueue." + queueId, new RowRange( - exportRangeStart, exportRangeStop)); + new TransientRegistry(appConfig).addTransientRange("exportQueue." + queueId, + new RowRange(exportRangeStart, exportRangeStop)); TableOptimizations.registerOptimization(appConfig, queueId, Optimizer.class); } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java index 88eae10..14acf69 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/CollisionFreeMap.java @@ -109,9 +109,8 @@ public class CollisionFreeMap<K, V> { (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance(); this.serializer = serializer; if (opts.updateObserverType != null) { - this.updateObserver = - getClass().getClassLoader().loadClass(opts.updateObserverType) - .asSubclass(UpdateObserver.class).newInstance(); + this.updateObserver = getClass().getClassLoader().loadClass(opts.updateObserverType) + .asSubclass(UpdateObserver.class).newInstance(); } else { this.updateObserver = new NullUpdateObserver<>(); } @@ -122,8 +121,8 @@ public class CollisionFreeMap<K, V> { // code is a round about way of using that copied code, with having to make anything in // CombineQueue public. CfmRegistry obsRegistry = new CfmRegistry(); - combineQ.registerObserver(obsRegistry, i -> this.combiner.combine(i.getKey(), i.iterator()), ( - tx, changes) -> this.updateObserver.updatingValues(tx, Update.transform(changes))); + combineQ.registerObserver(obsRegistry, i -> this.combiner.combine(i.getKey(), i.iterator()), + (tx, changes) -> this.updateObserver.updatingValues(tx, Update.transform(changes))); combineQueueObserver = obsRegistry.observer; } @@ -347,8 +346,8 @@ public class CollisionFreeMap<K, V> { * generated when optimizing the Accumulo table. */ public Options setBucketsPerTablet(int bucketsPerTablet) { - Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " - + bucketsPerTablet); + Preconditions.checkArgument(bucketsPerTablet > 0, + "bucketsPerTablet is <= 0 : " + bucketsPerTablet); this.bucketsPerTablet = bucketsPerTablet; return this; } @@ -361,8 +360,8 @@ public class CollisionFreeMap<K, V> { public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType, int buckets) { - this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType - .getName(), buckets); + this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), + valueType.getName(), buckets); } void save(SimpleConfiguration appConfig) { diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java index 674f367..42bd36d 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/map/Update.java @@ -51,7 +51,7 @@ public class Update<K, V> { } static <K2, V2> Iterator<Update<K2, V2>> transform(Iterable<Change<K2, V2>> changes) { - return Iterators.transform(changes.iterator(), change -> new Update<K2, V2>(change.getKey(), - change.getOldValue(), change.getNewValue())); + return Iterators.transform(changes.iterator(), + change -> new Update<K2, V2>(change.getKey(), change.getOldValue(), change.getNewValue())); } } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java index aad9437..5aa5fea 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/serialization/SimpleSerializer.java @@ -43,13 +43,11 @@ public interface SimpleSerializer { } static SimpleSerializer getInstance(SimpleConfiguration appConfig) { - String serType = - appConfig.getString("recipes.serializer", - "org.apache.fluo.recipes.kryo.KryoSimplerSerializer"); + String serType = appConfig.getString("recipes.serializer", + "org.apache.fluo.recipes.kryo.KryoSimplerSerializer"); try { - SimpleSerializer simplerSer = - SimpleSerializer.class.getClassLoader().loadClass(serType) - .asSubclass(SimpleSerializer.class).newInstance(); + SimpleSerializer simplerSer = SimpleSerializer.class.getClassLoader().loadClass(serType) + .asSubclass(SimpleSerializer.class).newInstance(); simplerSer.init(appConfig); return simplerSer; } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java index 4848bbc..ef550e6 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/transaction/LogEntry.java @@ -73,8 +73,8 @@ public class LogEntry { public boolean equals(Object o) { if (o instanceof LogEntry) { LogEntry other = (LogEntry) o; - return ((op == other.op) && row.equals(other.row) && col.equals(other.col) && value - .equals(other.value)); + return ((op == other.op) && row.equals(other.row) && col.equals(other.col) + && value.equals(other.value)); } return false; } diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java index 5a2c9c6..04d42cc 100644 --- a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java @@ -326,8 +326,8 @@ public class TypedSnapshotBase extends AbstractSnapshotBase implements SnapshotB @SuppressWarnings({"rawtypes", "unchecked"}) private Map wrap2(Map m) { - return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value( - (Bytes) null)))); + return Collections + .unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value((Bytes) null)))); } @SuppressWarnings("unchecked") diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/SplitsTest.java index efb0ef7..45576a4 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/SplitsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/SplitsTest.java @@ -45,10 +45,8 @@ public class SplitsTest { TableOptimizations tableOptim1 = new Optimizer().getTableOptimizations("foo", fluoConfig.getAppConfiguration()); - List<Bytes> expected1 = - Lists.transform( - Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"), - Bytes::of); + List<Bytes> expected1 = Lists.transform( + Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"), Bytes::of); Assert.assertEquals(expected1, sort(tableOptim1.getSplits())); @@ -57,10 +55,8 @@ public class SplitsTest { TableOptimizations tableOptim2 = new Optimizer().getTableOptimizations("bar", fluoConfig.getAppConfiguration()); - List<Bytes> expected2 = - Lists.transform( - Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"), - Bytes::of); + List<Bytes> expected2 = Lists.transform( + Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"), Bytes::of); Assert.assertEquals(expected2, sort(tableOptim2.getSplits())); } } diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/it/CombineQueueTreeIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/it/CombineQueueTreeIT.java index 1379948..9c3fc40 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/it/CombineQueueTreeIT.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/combine/it/CombineQueueTreeIT.java @@ -191,9 +191,8 @@ public class CombineQueueTreeIT { Map<String, Long> ret = new HashMap<>(); m.forEach((k, v) -> { String[] fields = k.split(":"); - String nk = - (useX ? fields[0] : "") + (useY ? ((useX ? ":" : "") + fields[1]) : "") - + (useTime ? ((useX || useY ? ":" : "") + fields[2]) : ""); + String nk = (useX ? fields[0] : "") + (useY ? ((useX ? ":" : "") + fields[1]) : "") + + (useTime ? ((useX || useY ? ":" : "") + fields[2]) : ""); ret.merge(nk, v, Long::sum); }); @@ -204,7 +203,8 @@ public class CombineQueueTreeIT { Map<String, Long> ret = new HashMap<>(); String prefix = "inv:" + rollupFields + ":"; - for (RowColumnValue rcv : snap.scanner().over(Span.prefix("inv:" + rollupFields + ":")).build()) { + for (RowColumnValue rcv : snap.scanner().over(Span.prefix("inv:" + rollupFields + ":")) + .build()) { String row = rcv.getsRow(); long count = Long.valueOf(row.substring(prefix.length(), row.length())); Assert.assertNull(ret.put(rcv.getColumn().getsQualifier(), count)); diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java index e69c9c5..2181ca0 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/data/RowHasherTest.java @@ -27,9 +27,8 @@ public class RowHasherTest { @Test public void testBadPrefixes() { - String[] badPrefixes = - {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", "p:she6;test1", - "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"}; + String[] badPrefixes = {"q:she6:test1", "q:she6:test1", "p:Mhe6:test1", "p;she6:test1", + "p:she6;test1", "p;she6;test1", "p:+he6:test1", "p:s?e6:test1", "p:sh{6:test1", "p:sh6:"}; RowHasher rh = new RowHasher("p"); for (String badPrefix : badPrefixes) { diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java index 8f092f8..1c6df54 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/OptionsTest.java @@ -32,11 +32,11 @@ public class OptionsTest { ec1.setProperty("ep1", "ev1"); ec1.setProperty("ep2", 3L); - ExportQueue.configure(conf, new org.apache.fluo.recipes.core.export.ExportQueue.Options("Q1", - "ET", "KT", "VT", 100)); - ExportQueue.configure(conf, new org.apache.fluo.recipes.core.export.ExportQueue.Options("Q2", - "ET2", "KT2", "VT2", 200).setBucketsPerTablet(20).setBufferSize(1000000) - .setExporterConfiguration(ec1)); + ExportQueue.configure(conf, + new org.apache.fluo.recipes.core.export.ExportQueue.Options("Q1", "ET", "KT", "VT", 100)); + ExportQueue.configure(conf, + new org.apache.fluo.recipes.core.export.ExportQueue.Options("Q2", "ET2", "KT2", "VT2", 200) + .setBucketsPerTablet(20).setBufferSize(1000000).setExporterConfiguration(ec1)); org.apache.fluo.recipes.core.export.ExportQueue.Options opts1 = new org.apache.fluo.recipes.core.export.ExportQueue.Options("Q1", diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java index 8625c59..44480ee 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/export/it/ExportTestBase.java @@ -144,8 +144,8 @@ public class ExportTestBase { ExportQueue<String, RefUpdates> refExportQueue = ExportQueue.getInstance(RefExporter.QUEUE_ID, ctx.getAppConfiguration()); - or.forColumn(new Column("content", "new"), STRONG).useObserver( - new DocumentObserver(refExportQueue)); + or.forColumn(new Column("content", "new"), STRONG) + .useObserver(new DocumentObserver(refExportQueue)); refExportQueue.registerObserver(or, new RefExporter()); } } diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java index 19924ee..5c3d3b9 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/OptionsTest.java @@ -25,10 +25,11 @@ public class OptionsTest { public void testExportQueueOptions() { FluoConfiguration conf = new FluoConfiguration(); - CollisionFreeMap.configure(conf, new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options( - "Q1", "CT", "KT", "VT", 100)); - CollisionFreeMap.configure(conf, new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options( - "Q2", "CT2", "KT2", "VT2", 200).setBucketsPerTablet(20).setBufferSize(1000000)); + CollisionFreeMap.configure(conf, + new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options("Q1", "CT", "KT", "VT", 100)); + CollisionFreeMap.configure(conf, + new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options("Q2", "CT2", "KT2", "VT2", + 200).setBucketsPerTablet(20).setBufferSize(1000000)); org.apache.fluo.recipes.core.map.CollisionFreeMap.Options opts1 = new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options("Q1", diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java index e6a6abe..5546897 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/SplitsTest.java @@ -46,13 +46,10 @@ public class SplitsTest { FluoConfiguration fluoConfig = new FluoConfiguration(); CollisionFreeMap.configure(fluoConfig, opts); - TableOptimizations tableOptim1 = - new CollisionFreeMap.Optimizer().getTableOptimizations("foo", - fluoConfig.getAppConfiguration()); - List<Bytes> expected1 = - Lists.transform( - Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"), - Bytes::of); + TableOptimizations tableOptim1 = new CollisionFreeMap.Optimizer().getTableOptimizations("foo", + fluoConfig.getAppConfiguration()); + List<Bytes> expected1 = Lists.transform( + Arrays.asList("foo:d:1", "foo:d:2", "foo:d:~", "foo:u:1", "foo:u:2", "foo:u:~"), Bytes::of); Assert.assertEquals(expected1, sort(tableOptim1.getSplits())); @@ -63,13 +60,10 @@ public class SplitsTest { opts2.setBucketsPerTablet(2); CollisionFreeMap.configure(fluoConfig, opts2); - TableOptimizations tableOptim2 = - new CollisionFreeMap.Optimizer().getTableOptimizations("bar", - fluoConfig.getAppConfiguration()); - List<Bytes> expected2 = - Lists.transform( - Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"), - Bytes::of); + TableOptimizations tableOptim2 = new CollisionFreeMap.Optimizer().getTableOptimizations("bar", + fluoConfig.getAppConfiguration()); + List<Bytes> expected2 = Lists.transform( + Arrays.asList("bar:d:2", "bar:d:4", "bar:d:~", "bar:u:2", "bar:u:4", "bar:u:~"), Bytes::of); Assert.assertEquals(expected2, sort(tableOptim2.getSplits())); } } diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java index ceda33f..a087519 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/BigUpdateIT.java @@ -62,8 +62,8 @@ public class BigUpdateIT { static final String MAP_ID = "bu"; - public static class LongCombiner implements - org.apache.fluo.recipes.core.map.Combiner<String, Long> { + public static class LongCombiner + implements org.apache.fluo.recipes.core.map.Combiner<String, Long> { @Override public Optional<Long> combine(String key, Iterator<Long> updates) { @@ -77,8 +77,8 @@ public class BigUpdateIT { private static AtomicInteger globalUpdates = new AtomicInteger(0); - public static class MyObserver extends - org.apache.fluo.recipes.core.map.UpdateObserver<String, Long> { + public static class MyObserver + extends org.apache.fluo.recipes.core.map.UpdateObserver<String, Long> { @Override public void updatingValues(TransactionBase tx, @@ -98,10 +98,9 @@ public class BigUpdateIT { } // get last values set to verify same as passed in old value - Map<String, Long> actualOld = - Maps.transformValues( - ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL)) - .toStringMap(), m -> m.get(DSCOL).toLong()); + Map<String, Long> actualOld = Maps.transformValues( + ttx.get().rowsString(expectedOld.keySet()).columns(ImmutableSet.of(DSCOL)).toStringMap(), + m -> m.get(DSCOL).toLong()); MapDifference<String, Long> diff = Maps.difference(expectedOld, actualOld); @@ -122,15 +121,16 @@ public class BigUpdateIT { SimpleSerializer.setSerializer(props, TestSerializer.class); - org.apache.fluo.recipes.core.map.CollisionFreeMap.configure(props, - new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options(MAP_ID, LongCombiner.class, - MyObserver.class, String.class, Long.class, 2).setBufferSize(1 << 10)); + org.apache.fluo.recipes.core.map.CollisionFreeMap + .configure(props, + new org.apache.fluo.recipes.core.map.CollisionFreeMap.Options(MAP_ID, + LongCombiner.class, MyObserver.class, String.class, Long.class, 2) + .setBufferSize(1 << 10)); miniFluo = FluoFactory.newMiniFluo(props); - wcMap = - org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(MAP_ID, - props.getAppConfiguration()); + wcMap = org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(MAP_ID, + props.getAppConfiguration()); globalUpdates.set(0); } @@ -193,8 +193,8 @@ public class BigUpdateIT { for (ColumnValue columnValue : columns) { Assert.assertEquals(new Column("debug", "sum"), columnValue.getColumn()); - Assert - .assertEquals("row : " + columns.getsRow(), "" + expectedVal, columnValue.getsValue()); + Assert.assertEquals("row : " + columns.getsRow(), "" + expectedVal, + columnValue.getsValue()); } } diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java index 787cbb0..298580f 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/CollisionFreeMapIT.java @@ -58,8 +58,8 @@ public class CollisionFreeMapIT { props.setWorkerThreads(20); props.setMiniDataDir("target/mini"); - props.addObserver(new org.apache.fluo.api.config.ObserverSpecification(DocumentObserver.class - .getName())); + props.addObserver( + new org.apache.fluo.api.config.ObserverSpecification(DocumentObserver.class.getName())); SimpleSerializer.setSerializer(props, TestSerializer.class); @@ -69,9 +69,8 @@ public class CollisionFreeMapIT { miniFluo = FluoFactory.newMiniFluo(props); - wcMap = - org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(MAP_ID, - props.getAppConfiguration()); + wcMap = org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(MAP_ID, + props.getAppConfiguration()); } @After diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java index dbb1034..aff12c8 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/DocumentObserver.java @@ -33,9 +33,8 @@ public class DocumentObserver extends TypedObserver { @Override public void init(Context context) throws Exception { - wcm = - org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, - context.getAppConfiguration()); + wcm = org.apache.fluo.recipes.core.map.CollisionFreeMap.getInstance(CollisionFreeMapIT.MAP_ID, + context.getAppConfiguration()); } @Override @@ -79,8 +78,8 @@ public class DocumentObserver extends TypedObserver { MapDifference<String, Long> diffs = Maps.difference(currCounts, newCounts); // compute the diffs for words that changed - changes.putAll(Maps.transformValues(diffs.entriesDiffering(), vDiff -> vDiff.rightValue() - - vDiff.leftValue())); + changes.putAll(Maps.transformValues(diffs.entriesDiffering(), + vDiff -> vDiff.rightValue() - vDiff.leftValue())); // add all new words changes.putAll(diffs.entriesOnlyOnRight()); diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java index 6d48688..0c16b1d 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/map/it/WordCountObserver.java @@ -24,8 +24,8 @@ import org.apache.fluo.api.data.Column; @Deprecated // TODO move to CombineQueue test when removing CFM -public class WordCountObserver extends - org.apache.fluo.recipes.core.map.UpdateObserver<String, Long> { +public class WordCountObserver + extends org.apache.fluo.recipes.core.map.UpdateObserver<String, Long> { @Override public void updatingValues(TransactionBase tx, diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java index 4fd8aa9..e2f2de9 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/transaction/RecordingTransactionTest.java @@ -67,17 +67,17 @@ public class RecordingTransactionTest { List<LogEntry> entries = rtx.getTxLog().getLogEntries(); Assert.assertEquals(4, entries.size()); Assert.assertEquals("LogEntry{op=SET, row=r1, col=cf1 , value=v1}", entries.get(0).toString()); - Assert.assertEquals("LogEntry{op=SET, row=r2, col=cf2 cq2 , value=v2}", entries.get(1) - .toString()); - Assert - .assertEquals("LogEntry{op=DELETE, row=r3, col=cf3 , value=}", entries.get(2).toString()); + Assert.assertEquals("LogEntry{op=SET, row=r2, col=cf2 cq2 , value=v2}", + entries.get(1).toString()); + Assert.assertEquals("LogEntry{op=DELETE, row=r3, col=cf3 , value=}", + entries.get(2).toString()); Assert.assertEquals("LogEntry{op=GET, row=r4, col=cf4 , value=v4}", entries.get(3).toString()); - Assert.assertEquals("{r4 cf4 =v4}", rtx.getTxLog().getOperationMap(LogEntry.Operation.GET) - .toString()); - Assert.assertEquals("{r1 cf1 =v1, r2 cf2 cq2 =v2}", new TreeMap<>(rtx.getTxLog() - .getOperationMap(LogEntry.Operation.SET)).toString()); - Assert.assertEquals("{r3 cf3 =}", rtx.getTxLog().getOperationMap(LogEntry.Operation.DELETE) - .toString()); + Assert.assertEquals("{r4 cf4 =v4}", + rtx.getTxLog().getOperationMap(LogEntry.Operation.GET).toString()); + Assert.assertEquals("{r1 cf1 =v1, r2 cf2 cq2 =v2}", + new TreeMap<>(rtx.getTxLog().getOperationMap(LogEntry.Operation.SET)).toString()); + Assert.assertEquals("{r3 cf3 =}", + rtx.getTxLog().getOperationMap(LogEntry.Operation.DELETE).toString()); } @Test @@ -87,10 +87,10 @@ public class RecordingTransactionTest { ttx.mutate().row("r6").fam("cf6").qual("cq6").set("1"); List<LogEntry> entries = rtx.getTxLog().getLogEntries(); Assert.assertEquals(2, entries.size()); - Assert.assertEquals("LogEntry{op=SET, row=r5, col=cf5 cq5 , value=1}", entries.get(0) - .toString()); - Assert.assertEquals("LogEntry{op=SET, row=r6, col=cf6 cq6 , value=1}", entries.get(1) - .toString()); + Assert.assertEquals("LogEntry{op=SET, row=r5, col=cf5 cq5 , value=1}", + entries.get(0).toString()); + Assert.assertEquals("LogEntry{op=SET, row=r6, col=cf6 cq6 , value=1}", + entries.get(1).toString()); } @Test @@ -102,10 +102,10 @@ public class RecordingTransactionTest { ttx.mutate().row("r3").fam("cfa").qual("cq3").set("3"); List<LogEntry> entries = rtx.getTxLog().getLogEntries(); Assert.assertEquals(2, entries.size()); - Assert.assertEquals("LogEntry{op=SET, row=r1, col=cfa cq1 , value=1}", entries.get(0) - .toString()); - Assert.assertEquals("LogEntry{op=SET, row=r3, col=cfa cq3 , value=3}", entries.get(1) - .toString()); + Assert.assertEquals("LogEntry{op=SET, row=r1, col=cfa cq1 , value=1}", + entries.get(0).toString()); + Assert.assertEquals("LogEntry{op=SET, row=r3, col=cfa cq3 , value=3}", + entries.get(1).toString()); } @Test @@ -150,8 +150,8 @@ public class RecordingTransactionTest { @Test public void testGetRows() { - expect(tx.get(Collections.emptyList(), Collections.emptySet())).andReturn( - Collections.emptyMap()); + expect(tx.get(Collections.emptyList(), Collections.emptySet())) + .andReturn(Collections.emptyMap()); replay(tx); Assert.assertEquals(Collections.emptyMap(), rtx.get(Collections.emptyList(), Collections.emptySet())); @@ -180,8 +180,8 @@ public class RecordingTransactionTest { Assert.assertFalse(rtx.getTxLog().isEmpty()); List<LogEntry> entries = rtx.getTxLog().getLogEntries(); Assert.assertEquals(1, entries.size()); - Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0) - .toString()); + Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", + entries.get(0).toString()); verify(tx, sb); } @@ -194,8 +194,8 @@ public class RecordingTransactionTest { ScannerBuilder sb = mock(ScannerBuilder.class); expect(cs.getRow()).andReturn(Bytes.of("r7")).times(2); - expect(cs.iterator()).andReturn( - Iterators.singletonIterator(new ColumnValue(new Column("cf7", "cq7"), "v7"))); + expect(cs.iterator()) + .andReturn(Iterators.singletonIterator(new ColumnValue(new Column("cf7", "cq7"), "v7"))); expect(rs.iterator()).andReturn(Iterators.singletonIterator(cs)); expect(rsb.build()).andReturn(rs); expect(sb.byRow()).andReturn(rsb); @@ -213,8 +213,8 @@ public class RecordingTransactionTest { Assert.assertEquals(new ColumnValue(new Column("cf7", "cq7"), "v7"), citer.next()); List<LogEntry> entries = rtx.getTxLog().getLogEntries(); Assert.assertEquals(1, entries.size()); - Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", entries.get(0) - .toString()); + Assert.assertEquals("LogEntry{op=GET, row=r7, col=cf7 cq7 , value=v7}", + entries.get(0).toString()); verify(tx, sb, rsb, rs, cs); } diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java index 8b3decf..2372b07 100644 --- a/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java +++ b/modules/core/src/test/java/org/apache/fluo/recipes/core/types/TypeLayerTest.java @@ -38,9 +38,8 @@ public class TypeLayerTest { TypedTransactionBase ttx = tl.wrap(tt); - Map<Column, Value> results = - ttx.get().row("r2") - .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"))); + Map<Column, Value> results = ttx.get().row("r2") + .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"))); Assert.assertNull(results.get(new Column("cf2", "6")).toInteger()); Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0)); @@ -49,12 +48,8 @@ public class TypeLayerTest { Assert.assertEquals(1, results.size()); - results = - ttx.get() - .row("r2") - .columns( - ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", - "8"))); + results = ttx.get().row("r2").columns( + ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"))); Assert.assertNull(results.get(new Column("cf2", "6")).toInteger()); Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0)); @@ -66,9 +61,8 @@ public class TypeLayerTest { Assert.assertEquals(2, results.size()); // test var args - Map<Column, Value> results2 = - ttx.get().row("r2") - .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8")); + Map<Column, Value> results2 = ttx.get().row("r2").columns(new Column("cf2", "6"), + new Column("cf2", "7"), new Column("cf2", "8")); Assert.assertEquals(results, results2); } @@ -82,31 +76,28 @@ public class TypeLayerTest { Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString()); Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString()); - Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes()) - .toString()); - Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A")) - .toString()); + Assert.assertEquals("v1", + ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes()).toString()); + Assert.assertEquals("v1", + ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A")).toString()); Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString()); Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString()); - Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes()) - .toString()); - Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B")) - .toString()); Assert.assertNull("v1", - ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes())) - .toString()); + ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes()).toString()); + Assert.assertNull("v1", + ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B")).toString()); + Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1") + .vis(ByteBuffer.wrap("A&B".getBytes())).toString()); Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3")); - Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes()) - .toString("v3")); - Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B")) - .toString("v3")); - Assert.assertEquals( - "v3", - ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes())) - .toString("v3")); + Assert.assertEquals("v3", + ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes()).toString("v3")); + Assert.assertEquals("v3", + ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B")).toString("v3")); + Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1") + .vis(ByteBuffer.wrap("A&B".getBytes())).toString("v3")); ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3); ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4); @@ -134,8 +125,8 @@ public class TypeLayerTest { public void testBuildColumn() { TypeLayer tl = new TypeLayer(new StringEncoder()); - Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes()) - .vis()); + Assert.assertEquals(new Column("f0", "q0"), + tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis()); Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis()); Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis()); Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis()); @@ -156,10 +147,9 @@ public class TypeLayerTest { public void testRead() throws Exception { TypeLayer tl = new TypeLayer(new StringEncoder()); - MockSnapshot ms = - new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12", - "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20", - "r3,cf3:cq3,28.195", "r4,cf4:cq4,true"); + MockSnapshot ms = new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", + "r2,cf2:7,12", "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20", + "r3,cf3:cq3,28.195", "r4,cf4:cq4,true"); TypedSnapshot tts = tl.wrap(ms); @@ -182,17 +172,14 @@ public class TypeLayerTest { Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes())); Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes()))); - Assert - .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes())); + Assert.assertEquals("13", + new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes())); Assert.assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes()))); Assert.assertEquals("13", Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString()); - Assert.assertEquals( - "13", - Bytes.of( - tts.get().row("r2").col(new Column("cf2", "8")) - .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString()); + Assert.assertEquals("13", Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")) + .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString()); // test non-existent Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger()); @@ -208,11 +195,8 @@ public class TypeLayerTest { Assert.assertEquals("14", new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes()))); Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer()); - Assert.assertEquals( - "14", - Bytes.of( - tts.get().row("r2").col(new Column("cf3", "8")) - .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString()); + Assert.assertEquals("14", Bytes.of(tts.get().row("r2").col(new Column("cf3", "8")) + .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString()); // test float & double Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat()); @@ -231,24 +215,24 @@ public class TypeLayerTest { Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString()); Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString()); Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString()); - Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17") - .toString()); + Assert.assertEquals("20", + tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17").toString()); // try different types for cf Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString()); Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString()); Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString()); Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString()); - Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17") - .toString()); + Assert.assertEquals("20", + tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17").toString()); // try different types for cq Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString()); Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString()); Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString()); Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString()); - Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes())) - .toString()); + Assert.assertEquals("20", + tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes())).toString()); ms.close(); tts.close(); @@ -275,8 +259,8 @@ public class TypeLayerTest { // increment non existent Assert.assertEquals(0, ttx.mutate().row("13").col(new Column("9", "22")).increment(6)); // increment non existent - Assert.assertEquals(0, ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())) - .increment(7)); + Assert.assertEquals(0, + ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7)); Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,32", "13,9:19,43", "13,9:20,54", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData); @@ -292,8 +276,8 @@ public class TypeLayerTest { // increment non existent Assert.assertEquals(0l, ttx.mutate().row("13").col(new Column("9", "22")).increment(6l)); // increment non existent - Assert.assertEquals(0l, ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())) - .increment(7l)); + Assert.assertEquals(0l, + ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l)); Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,32", "13,9:19,43", "13,9:20,54", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData); @@ -312,9 +296,10 @@ public class TypeLayerTest { ttx.mutate().row("13").fam("9").qual("24").set(-6.135d); ttx.mutate().row("13").fam("9").qual("25").set(false); - Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4", - "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135", - "13,9:25,false"), tt.setData); + Assert.assertEquals( + MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4", "13,9:19,5", "13,9:20,6", + "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135", "13,9:25,false"), + tt.setData); tt.setData.clear(); // test deleting data @@ -325,9 +310,9 @@ public class TypeLayerTest { ttx.mutate().row("13").col(new Column("9", "21")).delete(); ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete(); - Assert - .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20", - "13,9:21", "13,9:22"), tt.deletes); + Assert.assertEquals( + MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20", "13,9:21", "13,9:22"), + tt.deletes); tt.deletes.clear(); Assert.assertEquals(0, tt.setData.size()); Assert.assertEquals(0, tt.weakNotifications.size()); @@ -340,9 +325,9 @@ public class TypeLayerTest { ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify(); ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify(); - Assert - .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20", - "13,9:21", "13,9:22"), tt.weakNotifications); + Assert.assertEquals( + MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20", "13,9:21", "13,9:22"), + tt.weakNotifications); tt.weakNotifications.clear(); Assert.assertEquals(0, tt.setData.size()); Assert.assertEquals(0, tt.deletes.size()); @@ -352,9 +337,8 @@ public class TypeLayerTest { public void testMultiRow() throws Exception { TypeLayer tl = new TypeLayer(new StringEncoder()); - MockTransactionBase tt = - new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4", - "13,cf1:cq1,5", "13,cf1:cq2,6"); + MockTransactionBase tt = new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", + "12,cf1:cq2,4", "13,cf1:cq1,5", "13,cf1:cq2,6"); TypedTransactionBase ttx = tl.wrap(tt); @@ -422,12 +406,11 @@ public class TypeLayerTest { Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong()); Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6)); - Map<Integer, Map<Column, Value>> map5 = - ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1) - .toIntegerMap(); + Map<Integer, Map<Column, Value>> map5 = ttx.get() + .rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1).toIntegerMap(); - Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1) - .toIntegerMap()); + Assert.assertEquals(map5, + ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1).toIntegerMap()); Assert.assertEquals(2, map5.size()); Assert.assertEquals(1, map5.get(11).size()); @@ -435,14 +418,12 @@ public class TypeLayerTest { Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong()); Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6)); - Map<Integer, Map<Column, Value>> map6 = - ttx.get() - .rowsByteBuffers( - Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))) - .columns(c1).toIntegerMap(); + Map<Integer, Map<Column, Value>> map6 = ttx.get() + .rowsByteBuffers( + Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))) + .columns(c1).toIntegerMap(); - Assert.assertEquals( - map6, + Assert.assertEquals(map6, ttx.get() .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())) .columns(c1).toIntegerMap()); @@ -481,9 +462,8 @@ public class TypeLayerTest { Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData); tt.setData.clear(); - Map<Bytes, Map<Column, Bytes>> map2 = - ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")), - ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8"))); + Map<Bytes, Map<Column, Bytes>> map2 = ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")), + ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8"))); Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2); ttx.delete(Bytes.of("r6"), new Column("cf2", "7")); diff --git a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java index e8d7997..ca3deeb 100644 --- a/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java +++ b/modules/kryo/src/main/java/org/apache/fluo/recipes/kryo/KryoSimplerSerializer.java @@ -56,8 +56,8 @@ public class KryoSimplerSerializer implements SimpleSerializer, Serializable { private KryoPool getPool() { Preconditions.checkState(factory != null || factoryType != null, "KryFactory not initialized"); if (factory == null) { - return pools.computeIfAbsent(factoryType, ft -> new KryoPool.Builder(getFactory(ft)) - .softReferences().build()); + return pools.computeIfAbsent(factoryType, + ft -> new KryoPool.Builder(getFactory(ft)).softReferences().build()); } else { return pools.computeIfAbsent(factory.getClass().getName(), ft -> new KryoPool.Builder(factory).softReferences().build()); diff --git a/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java b/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java index 587f28c..4467868 100644 --- a/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java +++ b/modules/spark/src/main/java/org/apache/fluo/recipes/spark/FluoSparkHelper.java @@ -103,10 +103,9 @@ public class FluoSparkHelper { } private static Instance getInstance(FluoConfiguration config) { - ClientConfiguration clientConfig = - new ClientConfiguration().withInstance(config.getAccumuloInstance()) - .withZkHosts(config.getAccumuloZookeepers()) - .withZkTimeout(config.getZookeeperTimeout() / 1000); + ClientConfiguration clientConfig = new ClientConfiguration() + .withInstance(config.getAccumuloInstance()).withZkHosts(config.getAccumuloZookeepers()) + .withZkTimeout(config.getZookeeperTimeout() / 1000); return new ZooKeeperInstance(clientConfig); } @@ -325,9 +324,8 @@ public class FluoSparkHelper { // partition and sort data so that one file is created per an accumulo tablet Partitioner accumuloPartitioner; try { - accumuloPartitioner = - new AccumuloRangePartitioner(chooseConnector(opts).tableOperations().listSplits( - accumuloTable)); + accumuloPartitioner = new AccumuloRangePartitioner( + chooseConnector(opts).tableOperations().listSplits(accumuloTable)); } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) { throw new IllegalStateException(e); } diff --git a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java index 018e9f4..d1b8387 100644 --- a/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java +++ b/modules/spark/src/test/java/org/apache/fluo/recipes/spark/it/FluoSparkHelperIT.java @@ -60,10 +60,10 @@ public class FluoSparkHelperIT extends AccumuloExportITBase { List<RowColumnValue> expected = getData(); final String accumuloTable = "table1"; getAccumuloConnector().tableOperations().create(accumuloTable); - fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), - accumuloTable, new FluoSparkHelper.BulkImportOptions()); - Assert.assertTrue(FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable, - expected)); + fsh.bulkImportRcvToAccumulo(FluoSparkHelper.toPairRDD(ctx.parallelize(expected)), accumuloTable, + new FluoSparkHelper.BulkImportOptions()); + Assert.assertTrue( + FluoITHelper.verifyAccumuloTable(getAccumuloConnector(), accumuloTable, expected)); } @Test diff --git a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java index 430811c..3d3ae88 100644 --- a/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java +++ b/modules/test/src/main/java/org/apache/fluo/recipes/test/FluoITHelper.java @@ -123,8 +123,8 @@ public class FluoITHelper { retval |= diff("fluo val", rcv.getValue(), actualRcv.getValue()); if (retval) { - log.error("Difference found - row {} cf {} cq {} val {}", rcv.getsRow(), rcv.getColumn() - .getsFamily(), rcv.getColumn().getsQualifier(), rcv.getsValue()); + log.error("Difference found - row {} cf {} cq {} val {}", rcv.getsRow(), + rcv.getColumn().getsFamily(), rcv.getColumn().getsQualifier(), rcv.getsValue()); return false; } @@ -214,12 +214,12 @@ public class FluoITHelper { retval |= diff("val", rcv.getValue().toString(), kvEntry.getValue().toString()); if (retval) { - log.error("Difference found - row {} cf {} cq {} val {}", rcv.getRow().toString(), col - .getFamily().toString(), col.getQualifier().toString(), rcv.getValue().toString()); + log.error("Difference found - row {} cf {} cq {} val {}", rcv.getRow().toString(), + col.getFamily().toString(), col.getQualifier().toString(), rcv.getValue().toString()); return false; } - log.debug("Verified row {} cf {} cq {} val {}", rcv.getRow().toString(), col.getFamily() - .toString(), col.getQualifier().toString(), rcv.getValue().toString()); + log.debug("Verified row {} cf {} cq {} val {}", rcv.getRow().toString(), + col.getFamily().toString(), col.getQualifier().toString(), rcv.getValue().toString()); } if (scanIter.hasNext() || rcvIter.hasNext()) { @@ -302,9 +302,8 @@ public class FluoITHelper { } Iterator<String> iter = cols.iterator(); - RowColumnValue rcv = - new RowColumnValue(Bytes.of(iter.next()), new Column(iter.next(), iter.next()), - Bytes.of(iter.next())); + RowColumnValue rcv = new RowColumnValue(Bytes.of(iter.next()), + new Column(iter.next(), iter.next()), Bytes.of(iter.next())); ret.add(rcv); } diff --git a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java index 09200a1..0e4c8f2 100644 --- a/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java +++ b/modules/test/src/test/java/org/apache/fluo/recipes/test/export/AccumuloExporterIT.java @@ -53,12 +53,12 @@ public class AccumuloExporterIT extends AccumuloExportITBase { ExportQueue<String, String> teq = ExportQueue.getInstance(QUEUE_ID, appCfg); - teq.registerObserver(obsRegistry, new AccumuloExporter<>(QUEUE_ID, appCfg, (export, - mutConsumer) -> { - Mutation m = new Mutation(export.getKey()); - m.put("cf", "cq", export.getSequence(), export.getValue()); - mutConsumer.accept(m); - })); + teq.registerObserver(obsRegistry, + new AccumuloExporter<>(QUEUE_ID, appCfg, (export, mutConsumer) -> { + Mutation m = new Mutation(export.getKey()); + m.put("cf", "cq", export.getSequence(), export.getValue()); + mutConsumer.accept(m); + })); } } @@ -145,15 +145,15 @@ public class AccumuloExporterIT extends AccumuloExportITBase { } } - private void export(ExportQueue<String, String> teq, Transaction tx, - Map<String, String> expected, String k, String v) { + private void export(ExportQueue<String, String> teq, Transaction tx, Map<String, String> expected, + String k, String v) { teq.add(tx, k, v); expected.put(k, v); } private Collection<Text> getFluoSplits() throws Exception { - return getAccumuloConnector().tableOperations().listSplits( - getFluoConfiguration().getAccumuloTable()); + return getAccumuloConnector().tableOperations() + .listSplits(getFluoConfiguration().getAccumuloTable()); } private Map<String, String> getExports() throws Exception { -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
