This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 8c65e026be centralizes wrapping per file iterators based on metadata
(#3259)
8c65e026be is described below
commit 8c65e026be2db633d8f2a622aabf05db1c2d6c15
Author: Keith Turner <[email protected]>
AuthorDate: Fri Mar 31 14:56:18 2023 -0400
centralizes wrapping per file iterators based on metadata (#3259)
---
.../core/metadata/schema/DataFileValue.java | 21 +++++++++++++++++++++
.../accumulo/server/compaction/FileCompactor.java | 10 ++++------
.../org/apache/accumulo/server/fs/FileManager.java | 10 ++++------
3 files changed, 29 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
index 67c5e1518a..fe770d96c0 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/DataFileValue.java
@@ -21,6 +21,8 @@ package org.apache.accumulo.core.metadata.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
public class DataFileValue {
private long size;
@@ -114,4 +116,23 @@ public class DataFileValue {
}
this.time = time;
}
+
+ /**
+ * @return true if {@link #wrapFileIterator} would wrap a given iterator,
false otherwise.
+ */
+ public boolean willWrapIterator() {
+ return isTimeSet();
+ }
+
+ /**
+ * Use per file information from the metadata table to wrap the raw iterator
over a file with
+ * iterators that may take action based on data set in the metadata table.
+ */
+ public InterruptibleIterator wrapFileIterator(InterruptibleIterator iter) {
+ if (isTimeSet()) {
+ return new TimeSettingIterator(iter, getTime());
+ } else {
+ return iter;
+ }
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index f8ba0011c9..3b6b84a7d0 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -52,8 +52,8 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
import
org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterator;
import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
+import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
-import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -344,12 +344,10 @@ public class FileCompactor implements
Callable<CompactionStats> {
readers.add(reader);
- SortedKeyValueIterator<Key,Value> iter = new
ProblemReportingIterator(context,
- extent.tableId(), mapFile.getPathStr(), false, reader);
+ InterruptibleIterator iter = new ProblemReportingIterator(context,
extent.tableId(),
+ mapFile.getPathStr(), false, reader);
- if (filesToCompact.get(mapFile).isTimeSet()) {
- iter = new TimeSettingIterator(iter,
filesToCompact.get(mapFile).getTime());
- }
+ iter = filesToCompact.get(mapFile).wrapFileIterator(iter);
iters.add(iter);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 2284c81666..b5dce00ec0 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -46,7 +46,6 @@ import
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
import
org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource;
-import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -509,7 +508,8 @@ public class FileManager {
ArrayList<InterruptibleIterator> iters = new ArrayList<>();
- boolean sawTimeSet =
files.values().stream().anyMatch(DataFileValue::isTimeSet);
+ boolean someIteratorsWillWrap =
+ files.values().stream().anyMatch(DataFileValue::willWrapIterator);
for (Entry<FileSKVIterator,String> entry :
newlyReservedReaders.entrySet()) {
FileSKVIterator source = entry.getKey();
@@ -526,12 +526,10 @@ public class FileManager {
iter = new ProblemReportingIterator(context, tablet.tableId(),
filename, continueOnFailure,
detachable ? getSsi(filename, source) : source);
- if (sawTimeSet) {
+ if (someIteratorsWillWrap) {
// constructing FileRef is expensive so avoid if not needed
DataFileValue value = files.get(new TabletFile(new Path(filename)));
- if (value.isTimeSet()) {
- iter = new TimeSettingIterator(iter, value.getTime());
- }
+ iter = value.wrapFileIterator(iter);
}
iters.add(iter);