This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new edc16e5734 Adds IT that verifies scans see data written by concurrent
writers (#3639)
edc16e5734 is described below
commit edc16e5734c2d7291731b409de3c0f45a3fbc4a1
Author: Keith Turner <[email protected]>
AuthorDate: Mon Jul 24 17:53:27 2023 -0400
Adds IT that verifies scans see data written by concurrent writers (#3639)
The IT ensures that when there are concurrent writes, scans, and table
operations that scans always see any data written before the scan
started.
---
.../apache/accumulo/test/ScanConsistencyIT.java | 697 +++++++++++++++++++++
1 file changed, 697 insertions(+)
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanConsistencyIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanConsistencyIT.java
new file mode 100644
index 0000000000..41719d07e3
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ScanConsistencyIT.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.client.rfile.RFileWriter;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.MoreCollectors;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * This test verifies that scans will always see data written before the scan
started even when
+ * there are concurrent scans, writes, and table operations running.
+ */
+@Tag(SUNNY_DAY)
+public class ScanConsistencyIT extends AccumuloClusterHarness {
+
+ private static final Logger log =
LoggerFactory.getLogger(ScanConsistencyIT.class);
+
+ @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM",
"DMI_RANDOM_USED_ONLY_ONCE"},
+ justification = "predictable random is ok for testing")
+ @Test
+ public void testConcurrentScanConsistency() throws Exception {
+ final String table = this.getUniqueNames(1)[0];
+
+ /**
+ * Tips for debugging this test when it sees a row that should not exist
or does not see a row
+ * that should exist.
+ *
+ * 1. Disable the GC from running for the test.
+ *
+ * 2. Modify the test code to print some of the offending rows, just need
a few to start
+ * investigating.
+ *
+ * 3. After the test fails, somehow run the static function findRow()
passing it the Accumulo
+ * table directory that the failed test used and one of the problem rows.
+ *
+ * 4. Once the files containing the row is found, analyze what happened
with those files in the
+ * servers logs.
+ */
+ // getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR);
+
+ var executor = Executors.newCachedThreadPool();
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ client.tableOperations().create(table);
+
+ TestContext testContext = new TestContext(client, table,
getCluster().getFileSystem(),
+ getCluster().getTemporaryPath().toString());
+
+ List<Future<WriteStats>> writeTasks = new ArrayList<>();
+ List<Future<ScanStats>> scanTasks = new ArrayList<>();
+
+ Random random = new Random();
+
+ int numWriteTask = random.nextInt(10) + 1;
+ int numsScanTask = random.nextInt(10) + 1;
+
+ for (int i = 0; i < numWriteTask; i++) {
+ writeTasks.add(executor.submit(new WriteTask(testContext)));
+ }
+
+ for (int i = 0; i < numsScanTask; i++) {
+ scanTasks.add(executor.submit(new ScanTask(testContext)));
+ }
+
+ var tableOpsTask = executor.submit(new TableOpsTask(testContext));
+
+ // let the concurrent mayhem run for a bit
+ Thread.sleep(60000);
+
+ // let the threads know to exit
+ testContext.keepRunning.set(false);
+
+ for (Future<WriteStats> writeTask : writeTasks) {
+ var stats = writeTask.get();
+ log.debug(String.format("Wrote:%,d Bulk imported:%,d Deleted:%,d Bulk
deleted:%,d",
+ stats.written, stats.bulkImported, stats.deleted,
stats.bulkDeleted));
+ assertTrue(stats.written + stats.bulkImported > 0);
+ assertTrue(stats.deleted + stats.bulkDeleted > 0);
+ }
+
+ for (Future<ScanStats> scanTask : scanTasks) {
+ var stats = scanTask.get();
+ log.debug(String.format("Scanned:%,d verified:%,d", stats.scanned,
stats.verified));
+ assertTrue(stats.verified > 0);
+ // These scans were running concurrently with writes, so a scan will
see more data than what
+ // was written before the scan started.
+ assertTrue(stats.scanned > stats.verified);
+ }
+
+ log.debug(tableOpsTask.get());
+
+ var stats1 = scanData(testContext, new Range(), false);
+ var stats2 = scanData(testContext, new Range(), true);
+ log.debug(
+ String.format("Final scan, scanned:%,d verified:%,d",
stats1.scanned, stats1.verified));
+ assertTrue(stats1.verified > 0);
+ // Should see all expected data now that there are no concurrent writes
happening
+ assertEquals(stats1.scanned, stats1.verified);
+ assertEquals(stats2.scanned, stats1.scanned);
+ assertEquals(stats2.verified, stats1.verified);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ /**
+ * Tracks what data has been written and deleted in an Accumulo table.
+ */
+ private static class DataTracker {
+ // ConcurrentLinkedQueue was chosen because its safe to iterate over
concurrently w/o locking
+ private final Queue<DataSet> dataSets = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Reserves data for scan so that it will not be deleted and returns the
data that is expected
+ * to exist in the table at this point.
+ */
+ public ExpectedScanData beginScan() {
+ List<DataSet> reservedData = new ArrayList<>();
+
+ for (var dataSet : dataSets) {
+ if (dataSet.reserveForScan()) {
+ reservedData.add(dataSet);
+ }
+ }
+
+ return new ExpectedScanData(reservedData);
+ }
+
+ /**
+ * add new data that scans should expect to see
+ */
+ public void addExpectedData(List<Mutation> data) {
+ dataSets.add(new DataSet(data));
+ }
+
+ /**
+ * @return data to delete from the table that is not reserved for scans
+ */
+ public Collection<Mutation> getDeletes() {
+ DataSet dataSet = dataSets.poll();
+
+ if (dataSet == null) {
+ return List.of();
+ }
+
+ dataSet.reserveForDelete();
+
+ return Collections2.transform(dataSet.data, m -> {
+ Mutation delMutation = new Mutation(m.getRow());
+ m.getUpdates()
+ .forEach(cu -> delMutation.putDelete(cu.getColumnFamily(),
cu.getColumnQualifier()));
+ return delMutation;
+ });
+ }
+
+ public long estimatedRows() {
+ return dataSets.stream().mapToLong(ds -> ds.data.size()).sum();
+ }
+ }
+
+ private static class DataSet {
+ private final List<Mutation> data;
+
+ private int activeScans = 0;
+
+ private boolean deleting = false;
+
+ public DataSet(List<Mutation> data) {
+ this.data = data;
+ }
+
+ synchronized boolean reserveForScan() {
+ if (deleting) {
+ return false;
+ }
+
+ activeScans++;
+
+ return true;
+ }
+
+ synchronized void unreserveForScan() {
+ activeScans--;
+ Preconditions.checkState(activeScans >= 0);
+ if (activeScans == 0) {
+ notify();
+ }
+ }
+
+ synchronized void reserveForDelete() {
+ Preconditions.checkState(!deleting);
+ deleting = true;
+ while (activeScans > 0) {
+ try {
+ wait(50);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ Stream<Key> getExpectedData(Range range) {
+ return
data.stream().flatMap(ScanConsistencyIT::toKeys).filter(range::contains);
+ }
+ }
+
+ private static Stream<Key> toKeys(Mutation m) {
+ return m.getUpdates().stream().map(cu -> new Key(m.getRow(),
cu.getColumnFamily(),
+ cu.getColumnQualifier(), cu.getColumnVisibility(), 0L, cu.isDeleted(),
false));
+ }
+
+ private static class ExpectedScanData implements AutoCloseable {
+
+ private final List<DataSet> reservedData;
+
+ public ExpectedScanData(List<DataSet> reservedData) {
+ this.reservedData = reservedData;
+ }
+
+ /**
+ * @return keys that are expected to exist in the accumulo table
+ */
+ Stream<Key> getExpectedData(Range range) {
+ return reservedData.stream().flatMap(ds -> ds.getExpectedData(range));
+ }
+
+ @Override
+ public void close() {
+ reservedData.forEach(DataSet::unreserveForScan);
+ }
+ }
+
+ private static class TestContext {
+ final DataTracker dataTracker = new DataTracker();
+ final AccumuloClient client;
+ final String table;
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final AtomicLong generationCounter = new AtomicLong(0);
+ final FileSystem fileSystem;
+ private final String tmpDir;
+
+ private TestContext(AccumuloClient client, String table, FileSystem fs,
String tmpDir) {
+ this.client = client;
+ this.table = table;
+ this.fileSystem = fs;
+ this.tmpDir = tmpDir;
+ }
+ }
+
+ private static class ScanStats {
+ long scanned;
+ long verified;
+
+ public void add(ScanStats stats) {
+ scanned += stats.scanned;
+ verified += stats.verified;
+ }
+ }
+
+ private static ScanStats scan(ScannerBase scanner, Set<Key> expected) {
+ ScanStats stats = new ScanStats();
+ for (Map.Entry<Key,Value> entry : scanner) {
+ stats.scanned++;
+ Key key = entry.getKey();
+ key.setTimestamp(0);
+ if (expected.remove(key)) {
+ stats.verified++;
+ }
+ }
+
+ assertTrue(expected.isEmpty());
+ return stats;
+ }
+
+ // TODO create multiple ranges for batch scanner
+ private static ScanStats batchScanData(TestContext tctx, Range range) throws
Exception {
+ try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan();
+ BatchScanner scanner = tctx.client.createBatchScanner(tctx.table)) {
+ Set<Key> expected =
expectedScanData.getExpectedData(range).collect(Collectors.toSet());
+ scanner.setRanges(List.of(range));
+ return scan(scanner, expected);
+ }
+ }
+
+ private static ScanStats scanData(TestContext tctx, Range range, boolean
scanIsolated)
+ throws Exception {
+ try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan();
+ Scanner scanner = tctx.client.createScanner(tctx.table)) {
+ Set<Key> expected =
expectedScanData.getExpectedData(range).collect(Collectors.toSet());
+ scanner.setRange(range);
+
+ Scanner s = scanner;
+ if (scanIsolated) {
+ s = new IsolatedScanner(scanner);
+ }
+
+ return scan(s, expected);
+ }
+ }
+
+ private static class ScanTask implements Callable<ScanStats> {
+
+ private final TestContext tctx;
+
+ private ScanTask(TestContext testContext) {
+ this.tctx = testContext;
+ }
+
+ @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM",
"DMI_RANDOM_USED_ONLY_ONCE"},
+ justification = "predictable random is ok for testing")
+ @Override
+ public ScanStats call() throws Exception {
+ ScanStats allStats = new ScanStats();
+
+ Random random = new Random();
+
+ while (tctx.keepRunning.get()) {
+
+ Range range;
+ if (random.nextInt(10) == 0) {
+ // 1 in 10 chance of doing a full table scan
+ range = new Range();
+ } else {
+ long start = nextLongAbs(random);
+ long end = nextLongAbs(random);
+
+ while (end <= start) {
+ end = nextLongAbs(random);
+ }
+
+ range = new Range(String.format("%016x", start),
String.format("%016x", end));
+ }
+
+ int scanChance = random.nextInt(3);
+ if (scanChance == 0) {
+ allStats.add(scanData(tctx, range, false));
+ } else if (scanChance == 1) {
+ allStats.add(scanData(tctx, range, true));
+ } else {
+ allStats.add(batchScanData(tctx, range));
+ }
+ }
+
+ return allStats;
+ }
+ }
+
+ private static class WriteStats {
+ long written;
+ long deleted;
+ long bulkImported;
+ long bulkDeleted;
+ }
+
+ private static class WriteTask implements Callable<WriteStats> {
+
+ private final TestContext tctx;
+
+ private WriteTask(TestContext testContext) {
+ this.tctx = testContext;
+ }
+
+ private long bulkImport(Random random, Collection<Mutation> mutations)
throws Exception {
+
+ if (mutations.isEmpty()) {
+ return 0;
+ }
+
+ Path bulkDir = new Path(tctx.tmpDir + "/bulkimport_" +
nextLongAbs(random));
+
+ List<Key> keys =
mutations.stream().flatMap(ScanConsistencyIT::toKeys).sorted()
+ .collect(Collectors.toList());
+
+ Value val = new Value();
+ try {
+ tctx.fileSystem.mkdirs(bulkDir);
+ try (RFileWriter writer =
+ RFile.newWriter().to(bulkDir +
"/f1.rf").withFileSystem(tctx.fileSystem).build()) {
+ writer.startDefaultLocalityGroup();
+ for (Key key : keys) {
+ writer.append(key, val);
+ }
+ }
+
+
tctx.client.tableOperations().importDirectory(bulkDir.toString()).to(tctx.table)
+ .tableTime(true).load();
+ } finally {
+ tctx.fileSystem.delete(bulkDir, true);
+ }
+
+ return keys.size();
+ }
+
+ private long write(Iterable<Mutation> mutations) throws Exception {
+ long written = 0;
+ try (var writer = tctx.client.createBatchWriter(tctx.table)) {
+ for (Mutation m : mutations) {
+ writer.addMutation(m);
+ written += m.size();
+ }
+ }
+
+ return written;
+ }
+
+ @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM",
"DMI_RANDOM_USED_ONLY_ONCE"},
+ justification = "predictable random is ok for testing")
+ @Override
+ public WriteStats call() throws Exception {
+ WriteStats stats = new WriteStats();
+
+ Random random = new Random();
+
+ while (tctx.keepRunning.get()) {
+
+ // Each row has on average of 50 cols, so 20K rows would be around
100K entries.
+ if (tctx.dataTracker.estimatedRows() > 100_000 / 50) {
+ // There is a good bit of data, so delete some.
+ if (random.nextInt(5) == 0) {
+ // Get the data to delete from the data tracker before flushing,
that way its known to
+ // be completely written and in memory or bulk imported.
+ var deletes = tctx.dataTracker.getDeletes();
+ // When bulk importing deletes must flush before the bulk import
for the case where the
+ // data being deleted is in memory. The bulk import may cause a
full system compaction
+ // which will drop delete keys that suppress the in memory data.
Once the deletes are
+ // dropped, the in memory data is no longer deleted.
+ tctx.client.tableOperations().flush(tctx.table, null, null, true);
+ stats.bulkDeleted += bulkImport(random, deletes);
+ } else {
+ stats.deleted += write(tctx.dataTracker.getDeletes());
+ }
+ } else {
+
+ List<Mutation> dataAdded = new ArrayList<>();
+
+ long generation = tctx.generationCounter.getAndIncrement();
+
+ int rowsToGenerate = random.nextInt(1000);
+
+ Set<Long> seen = new HashSet<>();
+
+ for (int i = 0; i < rowsToGenerate; i++) {
+ Mutation m = generateMutation(random, generation, seen);
+ dataAdded.add(m);
+ }
+
+ if (random.nextInt(5) == 0) {
+ stats.bulkImported += bulkImport(random, dataAdded);
+ } else {
+ stats.written += write(dataAdded);
+ }
+
+ // Make the data just written visible to scans. Now that the writer
is closed scans should
+ // be able to see the data in the table.
+ tctx.dataTracker.addExpectedData(dataAdded);
+ }
+ }
+
+ return stats;
+ }
+
+ private Mutation generateMutation(Random random, long generation,
Set<Long> seen) {
+
+ int cols = random.nextInt(100) + 1;
+
+ // Ensuring every key in a generation is unique ensures there are no
duplicate keys in the
+ // test. The generation is unique for each data set and if the data
inside a generation is
+ // unique it guarantees each row is globally unique across dataset
generated for the test. The
+ // seen set ensures uniqueness inside the generation/dataset. The way
the test works,
+ // duplicates could cause false positives. Even though duplicates are
highly unlikely, this
+ // avoid a potential bug in the test itself that could be hard to track
down if it actually
+ // happened.
+ var nextRow = nextLongAbs(random);
+ while (!seen.add(nextRow)) {
+ nextRow = nextLongAbs(random);
+ }
+
+ String row = String.format("%016x:%016x", nextRow, generation);
+
+ Mutation m = new Mutation(row);
+ for (int i = 0; i < cols; i++) {
+ // The qualifiers are all unique in the row. This together with rows
being globally unique
+ // in the test ensures each key is globally unique in the test.
+ m.put(String.valueOf(random.nextInt(10)), String.format("%04x", i),
"");
+ }
+
+ return m;
+ }
+ }
+
+ public static class GenerationFilter extends Filter {
+
+ private String generation;
+
+ @Override
+ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> options,
+ IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ this.generation = options.get("generation");
+ }
+
+ @Override
+ public boolean accept(Key k, Value v) {
+ String kgen = k.getRowData().toString().split(":")[1];
+ return !generation.equals(kgen);
+ }
+ }
+
+ private static class TableOpsTask implements Callable<String> {
+ private final TestContext tctx;
+
+ private TableOpsTask(TestContext testContext) {
+ this.tctx = testContext;
+ }
+
+ @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM",
"DMI_RANDOM_USED_ONLY_ONCE"},
+ justification = "predictable random is ok for testing")
+ @Override
+ public String call() throws Exception {
+ int numFlushes = 0;
+ int numCompactions = 0;
+ int numSplits = 0;
+ int numMerges = 0;
+ int numFilters = 0;
+
+ Random random = new Random();
+
+ while (tctx.keepRunning.get()) {
+ Thread.sleep(1000);
+
+ int pick = random.nextInt(100);
+
+ if (pick < 10) {
+ // 1 in 10 chance of flushing
+ tctx.client.tableOperations().flush(tctx.table, null, null,
random.nextBoolean());
+ numFlushes++;
+ } else if (pick < 15) {
+ // 1 in 20 chance of compacting
+ tctx.client.tableOperations().compact(tctx.table,
+ new
CompactionConfig().setFlush(random.nextBoolean()).setWait(random.nextBoolean()));
+ numCompactions++;
+ } else if (pick < 20) {
+ // 1 in 20 chance of splitting
+ int splitsToAdd = random.nextInt(10);
+ TreeSet<Text> splits = new TreeSet<>();
+
+ for (int i = 0; i < splitsToAdd; i++) {
+ splits.add(new Text(String.format("%016x", nextLongAbs(random))));
+ }
+
+ tctx.client.tableOperations().addSplits(tctx.table, splits);
+ numSplits += splitsToAdd;
+ } else if (pick < 25) {
+ // 1 in 20 chance of merging
+ long start = nextLongAbs(random);
+ long end = nextLongAbs(random);
+
+ while (end <= start) {
+ end = nextLongAbs(random);
+ }
+
+ tctx.client.tableOperations().merge(tctx.table, new
Text(String.format("%016x", start)),
+ new Text(String.format("%016x", end)));
+ numMerges++;
+ } else if (pick < 30) {
+ // 1 in 20 chance of doing a filter compaction. This compaction will
delete a data set.
+ var deletes = tctx.dataTracker.getDeletes();
+
+ // The row has the format <random long>:<generation>, the following
gets the generations
+ // from the rows. Expect the generation to be the same for a set of
data to delete.
+ String gen = deletes.stream().map(m -> new String(m.getRow(), UTF_8))
+ .map(row ->
row.split(":")[1]).distinct().collect(MoreCollectors.onlyElement());
+
+ IteratorSetting iterSetting =
+ new IteratorSetting(100, "genfilter", GenerationFilter.class);
+ iterSetting.addOptions(Map.of("generation", gen));
+
+ // run a compaction that deletes every key with the specified
generation. Must wait on the
+ // compaction because at the end of the test it will try to verify
deleted data is not
+ // present. Must flush the table in case data to delete is still in
memory.
+ tctx.client.tableOperations().compact(tctx.table, new
CompactionConfig().setFlush(true)
+ .setWait(true).setIterators(List.of(iterSetting)));
+ numFilters++;
+ }
+ }
+
+ return String.format(
+ "Flushes:%,d Compactions:%,d Splits added:%,d Merges:%,d Filter
compactions:%,d",
+ numFlushes, numCompactions, numSplits, numMerges, numFilters);
+ }
+ }
+
+ /**
+ * @return absolute value of a random long
+ */
+ private static long nextLongAbs(Random r) {
+ return random.nextLong() & 0x7fffffffffffffffL;
+ }
+
+ /**
+ * This function was created to help debug issues with this test, leaving in
case its useful in
+ * the future. It finds all rfiles in a directory that contain a given row.
+ */
+ public static void findRow(String row, String tableDir) throws Exception {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+
+ var iter = fs.listFiles(new Path(tableDir), true);
+
+ while (iter.hasNext()) {
+ var f = iter.next();
+ if (f.isFile() && f.getPath().getName().endsWith(".rf")) {
+ // Calling withoutSystemIterators() disables filtering of delete keys
allowing files that
+ // only contain deletes for the row to be found.
+ try (var scanner =
RFile.newScanner().from(f.getPath().toString()).withFileSystem(fs)
+ .withoutSystemIterators().build()) {
+ scanner.setRange(new Range(new Text(row)));
+
+ var siter = scanner.iterator();
+
+ if (siter.hasNext()) {
+ System.out.println("File " + f.getPath().getName());
+ var e = siter.next();
+ System.out.println(" " + e.getKey() + " " + e.getValue());
+ }
+ }
+ }
+ }
+ }
+}