This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new d734acc00e0 HBASE-27153 Improvements to read-path tracing
d734acc00e0 is described below
commit d734acc00e0bd98a62f00aaddc36c3bd90bf4e5b
Author: Nick Dimiduk <[email protected]>
AuthorDate: Thu Jun 23 14:20:39 2022 +0200
HBASE-27153 Improvements to read-path tracing
Signed-off-by: Andrew Purtell <[email protected]>
---
.../client/trace/hamcrest/AttributesMatchers.java | 9 +
.../HFileContextAttributesBuilderConsumer.java | 110 +++++++++
.../apache/hadoop/hbase/io/util/BlockIOUtils.java | 271 +++++++++++++++------
.../hbase/trace/HBaseSemanticAttributes.java | 61 +++++
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 120 +++++----
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 17 +-
.../hadoop/hbase/io/hfile/PrefetchExecutor.java | 22 +-
.../hadoop/hbase/io/hfile/TestBlockIOUtils.java | 210 ++++++++++++----
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 40 ++-
9 files changed, 669 insertions(+), 191 deletions(-)
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
index aec96897179..f71defb329e 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.trace.hamcrest;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.is;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
@@ -54,6 +55,10 @@ public final class AttributesMatchers {
return containsEntry(AttributeKey.stringKey(key), value);
}
+ public static Matcher<Attributes> containsEntry(String key, long value) {
+ return containsEntry(AttributeKey.longKey(key), value);
+ }
+
public static Matcher<Attributes> containsEntryWithStringValuesOf(String
key, String... values) {
return containsEntry(AttributeKey.stringArrayKey(key),
Arrays.asList(values));
}
@@ -63,6 +68,10 @@ public final class AttributesMatchers {
return new
IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher);
}
+ public static Matcher<Attributes> isEmpty() {
+ return hasProperty("empty", is(true));
+ }
+
private static final class IsAttributesContaining<T> extends
TypeSafeMatcher<Attributes> {
private final Matcher<AttributeKey<? super T>> keyMatcher;
private final Matcher<? super T> valueMatcher;
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java
new file mode 100644
index 00000000000..663a745cc45
--- /dev/null
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/trace/HFileContextAttributesBuilderConsumer.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.trace;
+
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CHECKSUM_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.COMPRESSION_ALGORITHM_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DATA_BLOCK_ENCODING_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ENCRYPTION_CIPHER_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HFILE_NAME_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.READ_TYPE_KEY;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.ContextKey;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
+import org.apache.hadoop.hbase.util.ChecksumType;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * <p>
+ * Populate fields on an {@link AttributesBuilder} based on an {@link
HFileContext}. Passed around
+ * inside an active {@link Context}, indexed under {@link #CONTEXT_KEY}. The
class is designed such
+ * that calls to the {@link #accept(AttributesBuilder)} method are idempotent
with regards to the
+ * instance of this class.
+ * </p>
+ * <p>
+ * The true and truly ridiculous class name should be something more like
+ * {@code HFileContext_ContextAttributes_AttributesBuilder_Consumer}.
+ * </p>
+ */
[email protected]
+public class HFileContextAttributesBuilderConsumer implements
Consumer<AttributesBuilder> {
+
+ /**
+ * Used to place extract attributes pertaining to the {@link HFileContext}
that scopes the active
+ * {@link Context}.
+ */
+ public static final ContextKey<Consumer<AttributesBuilder>> CONTEXT_KEY =
+ ContextKey.named("db.hbase.io.hfile.context_attributes");
+
+ private final HFileContext hFileContext;
+
+ private boolean skipChecksum = false;
+ private ReadType readType = null;
+
+ public HFileContextAttributesBuilderConsumer(final HFileContext
hFileContext) {
+ this.hFileContext = Objects.requireNonNull(hFileContext);
+ }
+
+ /**
+ * Specify that the {@link ChecksumType} should not be included in the
attributes.
+ */
+ public HFileContextAttributesBuilderConsumer setSkipChecksum(final boolean
skipChecksum) {
+ this.skipChecksum = skipChecksum;
+ return this;
+ }
+
+ /**
+ * Specify the {@link ReadType} involced in this IO operation.
+ */
+ public HFileContextAttributesBuilderConsumer setReadType(final ReadType
readType) {
+ // TODO: this is not a part of the HFileBlock, its context of the
operation. Should track this
+ // detail elsewhere.
+ this.readType = readType;
+ return this;
+ }
+
+ @Override
+ public void accept(AttributesBuilder builder) {
+ if (hFileContext.getHFileName() != null) {
+ builder.put(HFILE_NAME_KEY, hFileContext.getHFileName());
+ }
+ if (hFileContext.getCompression() != null) {
+ builder.put(COMPRESSION_ALGORITHM_KEY,
hFileContext.getCompression().getName());
+ }
+ if (hFileContext.getDataBlockEncoding() != null) {
+ builder.put(DATA_BLOCK_ENCODING_KEY,
hFileContext.getDataBlockEncoding().name());
+ }
+ if (
+ hFileContext.getEncryptionContext() != null
+ && hFileContext.getEncryptionContext().getCipher() != null
+ ) {
+ builder.put(ENCRYPTION_CIPHER_KEY,
hFileContext.getEncryptionContext().getCipher().getName());
+ }
+ if (!skipChecksum && hFileContext.getChecksumType() != null) {
+ builder.put(CHECKSUM_KEY, hFileContext.getChecksumType().getName());
+ }
+ if (readType != null) {
+ builder.put(READ_TYPE_KEY, readType.name());
+ }
+ }
+}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
index 1720cae2300..86c6317556b 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/BlockIOUtils.java
@@ -17,13 +17,22 @@
*/
package org.apache.hadoop.hbase.io.util;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DIRECT_BYTES_READ_KEY;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.HEAP_BYTES_READ_KEY;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.util.Optional;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
+import
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.io.IOUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -76,33 +85,48 @@ public final class BlockIOUtils {
* @throws IOException exception to throw if any error happen
*/
public static void readFully(ByteBuff buf, FSDataInputStream dis, int
length) throws IOException {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
if (!isByteBufferReadable(dis)) {
// If InputStream does not support the ByteBuffer read, just read to
heap and copy bytes to
// the destination ByteBuff.
byte[] heapBuf = new byte[length];
IOUtils.readFully(dis, heapBuf, 0, length);
+ annotateHeapBytesRead(attributesBuilder, length);
+ span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
copyToByteBuff(heapBuf, 0, length, buf);
return;
}
+ int directBytesRead = 0, heapBytesRead = 0;
ByteBuffer[] buffers = buf.nioByteBuffers();
int remain = length;
int idx = 0;
ByteBuffer cur = buffers[idx];
- while (remain > 0) {
- while (!cur.hasRemaining()) {
- if (++idx >= buffers.length) {
+ try {
+ while (remain > 0) {
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException(
+ "Not enough ByteBuffers to read the reminding " + remain + " " +
"bytes");
+ }
+ cur = buffers[idx];
+ }
+ cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+ int bytesRead = dis.read(cur);
+ if (bytesRead < 0) {
throw new IOException(
- "Not enough ByteBuffers to read the reminding " + remain + " " +
"bytes");
+ "Premature EOF from inputStream, but still need " + remain + " " +
"bytes");
+ }
+ remain -= bytesRead;
+ if (cur.isDirect()) {
+ directBytesRead += bytesRead;
+ } else {
+ heapBytesRead += bytesRead;
}
- cur = buffers[idx];
- }
- cur.limit(cur.position() + Math.min(remain, cur.remaining()));
- int bytesRead = dis.read(cur);
- if (bytesRead < 0) {
- throw new IOException(
- "Premature EOF from inputStream, but still need " + remain + " " +
"bytes");
}
- remain -= bytesRead;
+ } finally {
+ annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
+ span.addEvent("BlockIOUtils.readFully", attributesBuilder.build());
}
}
@@ -116,19 +140,28 @@ public final class BlockIOUtils {
*/
public static void readFullyWithHeapBuffer(InputStream in, ByteBuff out, int
length)
throws IOException {
- byte[] buffer = new byte[1024];
if (length < 0) {
throw new IllegalArgumentException("Length must not be negative: " +
length);
}
+ int heapBytesRead = 0;
int remain = length, count;
- while (remain > 0) {
- count = in.read(buffer, 0, Math.min(remain, buffer.length));
- if (count < 0) {
- throw new IOException(
- "Premature EOF from inputStream, but still need " + remain + "
bytes");
+ byte[] buffer = new byte[1024];
+ try {
+ while (remain > 0) {
+ count = in.read(buffer, 0, Math.min(remain, buffer.length));
+ if (count < 0) {
+ throw new IOException(
+ "Premature EOF from inputStream, but still need " + remain + "
bytes");
+ }
+ out.put(buffer, 0, count);
+ remain -= count;
+ heapBytesRead += count;
}
- out.put(buffer, 0, count);
- remain -= count;
+ } finally {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
+ annotateHeapBytesRead(attributesBuilder, heapBytesRead);
+ span.addEvent("BlockIOUtils.readFullyWithHeapBuffer",
attributesBuilder.build());
}
}
@@ -147,20 +180,29 @@ public final class BlockIOUtils {
*/
private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int
bufOffset,
int necessaryLen, int extraLen) throws IOException {
+ int heapBytesRead = 0;
int bytesRemaining = necessaryLen + extraLen;
- while (bytesRemaining > 0) {
- int ret = in.read(buf, bufOffset, bytesRemaining);
- if (ret < 0) {
- if (bytesRemaining <= extraLen) {
- // We could not read the "extra data", but that is OK.
- break;
+ try {
+ while (bytesRemaining > 0) {
+ int ret = in.read(buf, bufOffset, bytesRemaining);
+ if (ret < 0) {
+ if (bytesRemaining <= extraLen) {
+ // We could not read the "extra data", but that is OK.
+ break;
+ }
+ throw new IOException("Premature EOF from inputStream (read " +
"returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and "
+ extraLen
+ + " extra bytes, " + "successfully read " + (necessaryLen +
extraLen - bytesRemaining));
}
- throw new IOException("Premature EOF from inputStream (read " +
"returned " + ret
- + ", was trying to read " + necessaryLen + " necessary bytes and " +
extraLen
- + " extra bytes, " + "successfully read " + (necessaryLen + extraLen
- bytesRemaining));
+ bufOffset += ret;
+ bytesRemaining -= ret;
+ heapBytesRead += ret;
}
- bufOffset += ret;
- bytesRemaining -= ret;
+ } finally {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
+ annotateHeapBytesRead(attributesBuilder, heapBytesRead);
+ span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
}
return bytesRemaining <= 0;
}
@@ -186,27 +228,41 @@ public final class BlockIOUtils {
copyToByteBuff(heapBuf, 0, heapBuf.length, buf);
return ret;
}
+ int directBytesRead = 0, heapBytesRead = 0;
ByteBuffer[] buffers = buf.nioByteBuffers();
int bytesRead = 0;
int remain = necessaryLen + extraLen;
int idx = 0;
ByteBuffer cur = buffers[idx];
- while (bytesRead < necessaryLen) {
- while (!cur.hasRemaining()) {
- if (++idx >= buffers.length) {
- throw new IOException("Not enough ByteBuffers to read the reminding
" + remain + "bytes");
+ try {
+ while (bytesRead < necessaryLen) {
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException(
+ "Not enough ByteBuffers to read the reminding " + remain +
"bytes");
+ }
+ cur = buffers[idx];
+ }
+ cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+ int ret = dis.read(cur);
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream (read returned
" + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and "
+ extraLen
+ + " extra bytes, successfully read " + bytesRead);
+ }
+ bytesRead += ret;
+ remain -= ret;
+ if (cur.isDirect()) {
+ directBytesRead += ret;
+ } else {
+ heapBytesRead += ret;
}
- cur = buffers[idx];
- }
- cur.limit(cur.position() + Math.min(remain, cur.remaining()));
- int ret = dis.read(cur);
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream (read returned "
+ ret
- + ", was trying to read " + necessaryLen + " necessary bytes and " +
extraLen
- + " extra bytes, successfully read " + bytesRead);
}
- bytesRead += ret;
- remain -= ret;
+ } finally {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
+ annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
+ span.addEvent("BlockIOUtils.readWithExtra", attributesBuilder.build());
}
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
}
@@ -264,15 +320,22 @@ public final class BlockIOUtils {
byte[] buf = new byte[remain];
int bytesRead = 0;
int lengthMustRead = readAllBytes ? remain : necessaryLen;
- while (bytesRead < lengthMustRead) {
- int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream (positional read
returned " + ret
- + ", was trying to read " + necessaryLen + " necessary bytes and " +
extraLen
- + " extra bytes, successfully read " + bytesRead);
+ try {
+ while (bytesRead < lengthMustRead) {
+ int ret = dis.read(position + bytesRead, buf, bytesRead, remain);
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream (positional
read returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and "
+ extraLen
+ + " extra bytes, successfully read " + bytesRead);
+ }
+ bytesRead += ret;
+ remain -= ret;
}
- bytesRead += ret;
- remain -= ret;
+ } finally {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
+ annotateHeapBytesRead(attributesBuilder, bytesRead);
+ span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
}
copyToByteBuff(buf, 0, bytesRead, buff);
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
@@ -280,39 +343,53 @@ public final class BlockIOUtils {
private static boolean preadWithExtraDirectly(ByteBuff buff,
FSDataInputStream dis, long position,
int necessaryLen, int extraLen, boolean readAllBytes) throws IOException {
+ int directBytesRead = 0, heapBytesRead = 0;
int remain = necessaryLen + extraLen, bytesRead = 0, idx = 0;
ByteBuffer[] buffers = buff.nioByteBuffers();
ByteBuffer cur = buffers[idx];
int lengthMustRead = readAllBytes ? remain : necessaryLen;
- while (bytesRead < lengthMustRead) {
- int ret;
- while (!cur.hasRemaining()) {
- if (++idx >= buffers.length) {
- throw new IOException("Not enough ByteBuffers to read the reminding
" + remain + "bytes");
+ try {
+ while (bytesRead < lengthMustRead) {
+ int ret;
+ while (!cur.hasRemaining()) {
+ if (++idx >= buffers.length) {
+ throw new IOException(
+ "Not enough ByteBuffers to read the reminding " + remain +
"bytes");
+ }
+ cur = buffers[idx];
+ }
+ cur.limit(cur.position() + Math.min(remain, cur.remaining()));
+ try {
+ ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position
+ bytesRead, cur);
+ } catch (IllegalAccessException e) {
+ throw new IOException("Unable to invoke ByteBuffer positioned read
when trying to read "
+ + bytesRead + " bytes from position " + position, e);
+ } catch (InvocationTargetException e) {
+ throw new IOException("Encountered an exception when invoking
ByteBuffer positioned read"
+ + " when trying to read " + bytesRead + " bytes from position " +
position, e);
+ } catch (NullPointerException e) {
+ throw new IOException("something is null");
+ } catch (Exception e) {
+ throw e;
+ }
+ if (ret < 0) {
+ throw new IOException("Premature EOF from inputStream (positional
read returned " + ret
+ + ", was trying to read " + necessaryLen + " necessary bytes and "
+ extraLen
+ + " extra bytes, successfully read " + bytesRead);
+ }
+ bytesRead += ret;
+ remain -= ret;
+ if (cur.isDirect()) {
+ directBytesRead += bytesRead;
+ } else {
+ heapBytesRead += bytesRead;
}
- cur = buffers[idx];
- }
- cur.limit(cur.position() + Math.min(remain, cur.remaining()));
- try {
- ret = (Integer) byteBufferPositionedReadMethod.invoke(dis, position +
bytesRead, cur);
- } catch (IllegalAccessException e) {
- throw new IOException("Unable to invoke ByteBuffer positioned read
when trying to read "
- + bytesRead + " bytes from position " + position, e);
- } catch (InvocationTargetException e) {
- throw new IOException("Encountered an exception when invoking
ByteBuffer positioned read"
- + " when trying to read " + bytesRead + " bytes from position " +
position, e);
- } catch (NullPointerException e) {
- throw new IOException("something is null");
- } catch (Exception e) {
- throw e;
- }
- if (ret < 0) {
- throw new IOException("Premature EOF from inputStream (positional read
returned " + ret
- + ", was trying to read " + necessaryLen + " necessary bytes and " +
extraLen
- + " extra bytes, successfully read " + bytesRead);
}
- bytesRead += ret;
- remain -= ret;
+ } finally {
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder =
builderFromContext(Context.current());
+ annotateBytesRead(attributesBuilder, directBytesRead, heapBytesRead);
+ span.addEvent("BlockIOUtils.preadWithExtra", attributesBuilder.build());
}
return (extraLen > 0) && (bytesRead == necessaryLen + extraLen);
@@ -340,4 +417,38 @@ public final class BlockIOUtils {
}
return len;
}
+
+ /**
+ * Construct a fresh {@link AttributesBuilder} from the provided {@link
Context}, populated with
+ * relevant attributes populated by {@link
HFileContextAttributesBuilderConsumer#CONTEXT_KEY}.
+ */
+ private static AttributesBuilder builderFromContext(Context context) {
+ final AttributesBuilder attributesBuilder = Attributes.builder();
+ Optional.ofNullable(context)
+ .map(val -> val.get(HFileContextAttributesBuilderConsumer.CONTEXT_KEY))
+ .ifPresent(c -> c.accept(attributesBuilder));
+ return attributesBuilder;
+ }
+
+ /**
+ * Conditionally annotate {@code span} with the appropriate attribute when
value is non-zero.
+ */
+ private static void annotateHeapBytesRead(AttributesBuilder
attributesBuilder,
+ int heapBytesRead) {
+ annotateBytesRead(attributesBuilder, 0, heapBytesRead);
+ }
+
+ /**
+ * Conditionally annotate {@code attributesBuilder} with appropriate
attributes when values are
+ * non-zero.
+ */
+ private static void annotateBytesRead(AttributesBuilder attributesBuilder,
long directBytesRead,
+ long heapBytesRead) {
+ if (directBytesRead > 0) {
+ attributesBuilder.put(DIRECT_BYTES_READ_KEY, directBytesRead);
+ }
+ if (heapBytesRead > 0) {
+ attributesBuilder.put(HEAP_BYTES_READ_KEY, heapBytesRead);
+ }
+ }
}
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
index 40dfc1dce4f..a629761b87b 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.trace;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
+import java.nio.ByteBuffer;
import java.util.List;
import org.apache.yetus.audience.InterfaceAudience;
@@ -56,6 +57,66 @@ public final class HBaseSemanticAttributes {
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey<String> WAL_IMPL =
AttributeKey.stringKey("db.hbase.wal.impl");
+ /**
+ * Indicates the amount of data was read into a {@link ByteBuffer} of type
+ * {@link ByteBuffer#isDirect() direct}.
+ */
+ public static final AttributeKey<Long> DIRECT_BYTES_READ_KEY =
+ AttributeKey.longKey("db.hbase.io.direct_bytes_read");
+ /**
+ * Indicates the amount of data was read into a {@link ByteBuffer} not of
type
+ * {@link ByteBuffer#isDirect() direct}.
+ */
+ public static final AttributeKey<Long> HEAP_BYTES_READ_KEY =
+ AttributeKey.longKey("db.hbase.io.heap_bytes_read");
+ /**
+ * Indicates the {@link
org.apache.hadoop.hbase.io.compress.Compression.Algorithm} used to encode
+ * an HFile.
+ */
+ public static final AttributeKey<String> COMPRESSION_ALGORITHM_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding");
+ /**
+ * Indicates the {@link
org.apache.hadoop.hbase.io.encoding.DataBlockEncoding} algorithm used to
+ * encode this HFile.
+ */
+ public static final AttributeKey<String> DATA_BLOCK_ENCODING_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.data_block_encoding");
+ /**
+ * Indicates the {@link org.apache.hadoop.hbase.io.crypto.Cipher} used to
encrypt this HFile.
+ */
+ public static final AttributeKey<String> ENCRYPTION_CIPHER_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.encryption_cipher");
+ /**
+ * Indicates the {@link org.apache.hadoop.hbase.util.ChecksumType} used to
encode this HFile.
+ */
+ public static final AttributeKey<String> CHECKSUM_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.checksum_type");
+ /**
+ * Indicates the name of the HFile accessed.
+ */
+ public static final AttributeKey<String> HFILE_NAME_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.file_name");
+ /**
+ * Indicated the type of read.
+ */
+ public static final AttributeKey<String> READ_TYPE_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.read_type");
+ /**
+ * Identifies an entry in the Block Cache.
+ */
+ public static final AttributeKey<String> BLOCK_CACHE_KEY_KEY =
+ AttributeKey.stringKey("db.hbase.io.hfile.block_cache_key");
+
+ /**
+ * These values represent the different IO read strategies HBase may employ
for accessing
+ * filesystem data.
+ */
+ public enum ReadType {
+ // TODO: promote this to the FSReader#readBlockData API. Or somehow
instead use Scan.ReadType.
+ POSITIONAL_READ,
+ SEEK_PLUS_READ,
+ }
+
/**
* These are values used with {@link #DB_OPERATION}. They correspond with
the implementations of
* {@code org.apache.hadoop.hbase.client.Operation}, as well as
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index fe2e0c7fab9..f68ffffa94a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -18,7 +18,13 @@
package org.apache.hadoop.hbase.io.hfile;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+import static
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer.CONTEXT_KEY;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -26,6 +32,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +53,13 @@ import
org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.ReadType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -620,7 +629,9 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = shallowClone(this, newBuf);
boolean succ = false;
- try {
+ final Context context =
+ Context.current().with(CONTEXT_KEY, new
HFileContextAttributesBuilderConsumer(fileContext));
+ try (Scope ignored = context.makeCurrent()) {
HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
? reader.getBlockDecodingContext()
: reader.getDefaultBlockDecodingContext();
@@ -1479,59 +1490,65 @@ public class HFileBlock implements Cacheable {
boolean updateMetrics, boolean intoHeap) throws IOException {
// Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
- // thread-safe but the one constaint is that if we decide
+ // thread-safe but the one constraint is that if we decide
// to skip hbase checksum verification then we are
// guaranteed to use hdfs checksum verification.
boolean doVerificationThruHBaseChecksum =
streamWrapper.shouldUseHBaseChecksum();
FSDataInputStream is =
streamWrapper.getStream(doVerificationThruHBaseChecksum);
-
- HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL, pread,
- doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
- if (blk == null) {
- HFile.LOG.warn("HBase checksum verification failed for file " +
pathName + " at offset "
- + offset + " filesize " + fileSize + ". Retrying read with HDFS
checksums turned on...");
-
- if (!doVerificationThruHBaseChecksum) {
- String msg = "HBase checksum verification failed for file " +
pathName + " at offset "
- + offset + " filesize " + fileSize + " but this cannot happen
because doVerify is "
- + doVerificationThruHBaseChecksum;
- HFile.LOG.warn(msg);
- throw new IOException(msg); // cannot happen case here
- }
- HFile.CHECKSUM_FAILURES.increment(); // update metrics
-
- // If we have a checksum failure, we fall back into a mode where
- // the next few reads use HDFS level checksums. We aim to make the
- // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
- // hbase checksum verification, but since this value is set without
- // holding any locks, it can so happen that we might actually do
- // a few more than precisely this number.
- is =
this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
- doVerificationThruHBaseChecksum = false;
- blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
+ final Context context = Context.current().with(CONTEXT_KEY,
+ new HFileContextAttributesBuilderConsumer(fileContext)
+ .setSkipChecksum(doVerificationThruHBaseChecksum)
+ .setReadType(pread ? ReadType.POSITIONAL_READ :
ReadType.SEEK_PLUS_READ));
+ try (Scope ignored = context.makeCurrent()) {
+ HFileBlock blk = readBlockDataInternal(is, offset,
onDiskSizeWithHeaderL, pread,
doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
- if (blk != null) {
- HFile.LOG.warn("HDFS checksum verification succeeded for file " +
pathName + " at offset "
- + offset + " filesize " + fileSize);
+ if (blk == null) {
+ HFile.LOG.warn("HBase checksum verification failed for file {} at
offset {} filesize {}."
+ + " Retrying read with HDFS checksums turned on...", pathName,
offset, fileSize);
+
+ if (!doVerificationThruHBaseChecksum) {
+ String msg = "HBase checksum verification failed for file " +
pathName + " at offset "
+ + offset + " filesize " + fileSize + " but this cannot happen
because doVerify is "
+ + doVerificationThruHBaseChecksum;
+ HFile.LOG.warn(msg);
+ throw new IOException(msg); // cannot happen case here
+ }
+ HFile.CHECKSUM_FAILURES.increment(); // update metrics
+
+ // If we have a checksum failure, we fall back into a mode where
+ // the next few reads use HDFS level checksums. We aim to make the
+ // next CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD reads avoid
+ // hbase checksum verification, but since this value is set without
+ // holding any locks, it can so happen that we might actually do
+ // a few more than precisely this number.
+ is =
this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
+ doVerificationThruHBaseChecksum = false;
+ blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
+ doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
+ if (blk != null) {
+ HFile.LOG.warn(
+ "HDFS checksum verification succeeded for file {} at offset {}
filesize" + " {}",
+ pathName, offset, fileSize);
+ }
+ }
+ if (blk == null && !doVerificationThruHBaseChecksum) {
+ String msg =
+ "readBlockData failed, possibly due to " + "checksum verification
failed for file "
+ + pathName + " at offset " + offset + " filesize " + fileSize;
+ HFile.LOG.warn(msg);
+ throw new IOException(msg);
}
- }
- if (blk == null && !doVerificationThruHBaseChecksum) {
- String msg =
- "readBlockData failed, possibly due to " + "checksum verification
failed for file "
- + pathName + " at offset " + offset + " filesize " + fileSize;
- HFile.LOG.warn(msg);
- throw new IOException(msg);
- }
- // If there is a checksum mismatch earlier, then retry with
- // HBase checksums switched off and use HDFS checksum verification.
- // This triggers HDFS to detect and fix corrupt replicas. The
- // next checksumOffCount read requests will use HDFS checksums.
- // The decrementing of this.checksumOffCount is not thread-safe,
- // but it is harmless because eventually checksumOffCount will be
- // a negative number.
- streamWrapper.checksumOk();
- return blk;
+ // If there is a checksum mismatch earlier, then retry with
+ // HBase checksums switched off and use HDFS checksum verification.
+ // This triggers HDFS to detect and fix corrupt replicas. The
+ // next checksumOffCount read requests will use HDFS checksums.
+ // The decrementing of this.checksumOffCount is not thread-safe,
+ // but it is harmless because eventually checksumOffCount will be
+ // a negative number.
+ streamWrapper.checksumOk();
+ return blk;
+ }
}
/**
@@ -1629,6 +1646,11 @@ public class HFileBlock implements Cacheable {
throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize="
+ onDiskSizeWithHeaderL + ")");
}
+
+ final Span span = Span.current();
+ final AttributesBuilder attributesBuilder = Attributes.builder();
+ Optional.of(Context.current()).map(val -> val.get(CONTEXT_KEY))
+ .ifPresent(c -> c.accept(attributesBuilder));
int onDiskSizeWithHeader = checkAndGetSizeAsInt(onDiskSizeWithHeaderL,
hdrSize);
// Try and get cached header. Will serve us in rare case where
onDiskSizeWithHeaderL is -1
// and will save us having to seek the stream backwards to reread the
header we
@@ -1653,8 +1675,9 @@ public class HFileBlock implements Cacheable {
// in a LOG every time we seek. See HBASE-17072 for more detail.
if (headerBuf == null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Extra see to get block size!", new RuntimeException());
+ LOG.trace("Extra seek to get block size!", new RuntimeException());
}
+ span.addEvent("Extra seek to get block size!",
attributesBuilder.build());
headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind();
@@ -1707,6 +1730,7 @@ public class HFileBlock implements Cacheable {
hFileBlock.sanityCheckUncompressed();
}
LOG.trace("Read {} in {} ms", hFileBlock, duration);
+ span.addEvent("Read block", attributesBuilder.build());
// Cache next block header if we read it for the next time through
here.
if (nextBlockOnDiskSize != -1) {
cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(),
onDiskBlock,
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 1d126f32514..e72dada5403 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;
+
+import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Scope;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -44,7 +46,6 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.IdLock;
@@ -1252,11 +1253,12 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
BlockCacheKey cacheKey =
new BlockCacheKey(name, dataBlockOffset, this.isPrimaryReplicaReader(),
expectedBlockType);
+ Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY,
cacheKey.toString());
boolean useLock = false;
IdLock.Entry lockEntry = null;
- Span span =
TraceUtil.getGlobalTracer().spanBuilder("HFileReaderImpl.readBlock").startSpan();
- try (Scope traceScope = span.makeCurrent()) {
+ final Span span = Span.current();
+ try {
while (true) {
// Check cache for block. If found return.
if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
@@ -1269,9 +1271,9 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("From Cache " + cachedBlock);
+ LOG.trace("From Cache {}", cachedBlock);
}
- span.addEvent("blockCacheHit");
+ span.addEvent("block cache hit", attributes);
assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) {
if (updateCacheMetrics) {
@@ -1301,7 +1303,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
// Carry on, please load.
}
- span.addEvent("blockCacheMiss");
+ span.addEvent("block cache miss", attributes);
// Load block from filesystem.
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
@@ -1331,7 +1333,6 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
if (lockEntry != null) {
offsetLock.releaseLockEntry(lockEntry);
}
- span.end();
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index 8561fc1c893..8bf63909fcb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -88,23 +89,22 @@ public final class PrefetchExecutor {
delay = 0;
}
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Prefetch requested for " + path + ", delay=" + delay + "
ms");
- }
- prefetchFutures.put(path,
- prefetchExecutorPool.schedule(runnable, delay,
TimeUnit.MILLISECONDS));
+ LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
+ final Runnable tracedRunnable =
+ TraceUtil.tracedRunnable(runnable, "PrefetchExecutor.request");
+ final Future<?> future =
+ prefetchExecutorPool.schedule(tracedRunnable, delay,
TimeUnit.MILLISECONDS);
+ prefetchFutures.put(path, future);
} catch (RejectedExecutionException e) {
prefetchFutures.remove(path);
- LOG.warn("Prefetch request rejected for " + path);
+ LOG.warn("Prefetch request rejected for {}", path);
}
}
}
public static void complete(Path path) {
prefetchFutures.remove(path);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Prefetch completed for " + path);
- }
+ LOG.debug("Prefetch completed for {}", path);
}
public static void cancel(Path path) {
@@ -113,9 +113,7 @@ public final class PrefetchExecutor {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Prefetch cancelled for " + path);
- }
+ LOG.debug("Prefetch cancelled for {}", path);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
index c91df64b472..f3e4b87a7ca 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java
@@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers.hasAttributes;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEvents;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -29,11 +38,17 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,6 +57,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MatcherPredicate;
+import org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers;
+import org.apache.hadoop.hbase.client.trace.hamcrest.EventMatchers;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
@@ -52,6 +70,7 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
@@ -74,6 +93,9 @@ public class TestBlockIOUtils {
@Rule
public ExpectedException exception = ExpectedException.none();
+ @Rule
+ public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final int NUM_TEST_BLOCKS = 2;
@@ -93,20 +115,29 @@ public class TestBlockIOUtils {
@Test
public void testReadFully() throws IOException {
- FileSystem fs = TEST_UTIL.getTestFileSystem();
- Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
- String s = "hello world";
- try (FSDataOutputStream out = fs.create(p)) {
- out.writeBytes(s);
- }
- ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
- try (FSDataInputStream in = fs.open(p)) {
- BlockIOUtils.readFully(buf, in, 11);
- }
- buf.rewind();
- byte[] heapBuf = new byte[s.length()];
- buf.get(heapBuf, 0, heapBuf.length);
- assertArrayEquals(Bytes.toBytes(s), heapBuf);
+ TraceUtil.trace(() -> {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully");
+ String s = "hello world";
+ try (FSDataOutputStream out = fs.create(p)) {
+ out.writeBytes(s);
+ }
+ ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11));
+ try (FSDataInputStream in = fs.open(p)) {
+ BlockIOUtils.readFully(buf, in, 11);
+ }
+ buf.rewind();
+ byte[] heapBuf = new byte[s.length()];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes(s), heapBuf);
+ }, testName.getMethodName());
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readFully"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
11))))))));
}
@Test
@@ -214,33 +245,69 @@ public class TestBlockIOUtils {
try (FSDataOutputStream out = fs.create(p)) {
out.writeBytes(s);
}
- ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
- try (FSDataInputStream in = fs.open(p)) {
- assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
+
+ Span span = TraceUtil.createSpan(testName.getMethodName());
+ try (Scope ignored = span.makeCurrent()) {
+ ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8));
+ try (FSDataInputStream in = fs.open(p)) {
+ assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2));
+ }
+ buf.rewind();
+ byte[] heapBuf = new byte[buf.capacity()];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
+ } finally {
+ span.end();
}
- buf.rewind();
- byte[] heapBuf = new byte[buf.capacity()];
- buf.get(heapBuf, 0, heapBuf.length);
- assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf);
-
- buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4),
ByteBuffer.allocate(4));
- try (FSDataInputStream in = fs.open(p)) {
- assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
8L))))))));
+
+ otelRule.clearSpans();
+ span = TraceUtil.createSpan(testName.getMethodName());
+ try (Scope ignored = span.makeCurrent()) {
+ ByteBuff buf =
+ new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4),
ByteBuffer.allocate(4));
+ try (FSDataInputStream in = fs.open(p)) {
+ assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3));
+ }
+ buf.rewind();
+ byte[] heapBuf = new byte[11];
+ buf.get(heapBuf, 0, heapBuf.length);
+ assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
+ } finally {
+ span.end();
}
- buf.rewind();
- heapBuf = new byte[11];
- buf.get(heapBuf, 0, heapBuf.length);
- assertArrayEquals(Bytes.toBytes("hello world"), heapBuf);
-
- buf.position(0).limit(12);
- try (FSDataInputStream in = fs.open(p)) {
- try {
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
11L))))))));
+
+ otelRule.clearSpans();
+ span = TraceUtil.createSpan(testName.getMethodName());
+ try (Scope ignored = span.makeCurrent()) {
+ ByteBuff buf =
+ new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4),
ByteBuffer.allocate(4));
+ buf.position(0).limit(12);
+ exception.expect(IOException.class);
+ try (FSDataInputStream in = fs.open(p)) {
BlockIOUtils.readWithExtra(buf, in, 12, 0);
fail("Should only read 11 bytes");
- } catch (IOException e) {
-
}
+ } finally {
+ span.end();
}
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.readWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
11L))))))));
}
@Test
@@ -255,11 +322,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false);
- boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen,
extraLen);
+ boolean ret =
+ TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position,
necessaryLen, extraLen),
+ testName.getMethodName());
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
totalLen))))))));
}
@Test
@@ -275,12 +351,21 @@ public class TestBlockIOUtils {
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 5)).thenReturn(5);
when(in.hasCapability(anyString())).thenReturn(false);
- boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen,
extraLen);
+ boolean ret =
+ TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position,
necessaryLen, extraLen),
+ testName.getMethodName());
assertFalse("Expect false return when no extra bytes requested", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 5);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
totalLen))))))));
}
@Test
@@ -295,11 +380,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen);
when(in.hasCapability(anyString())).thenReturn(false);
- boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen,
extraLen);
+ boolean ret =
+ TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position,
necessaryLen, extraLen),
+ testName.getMethodName());
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
totalLen))))))));
}
@Test
@@ -314,11 +408,20 @@ public class TestBlockIOUtils {
FSDataInputStream in = mock(FSDataInputStream.class);
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen);
when(in.hasCapability(anyString())).thenReturn(false);
- boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen,
extraLen);
+ boolean ret =
+ TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position,
necessaryLen, extraLen),
+ testName.getMethodName());
assertFalse("Expect false return when reading extra bytes fails", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
necessaryLen))))))));
}
@Test
@@ -334,12 +437,21 @@ public class TestBlockIOUtils {
when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5);
when(in.read(5, buf, 5, 10)).thenReturn(10);
when(in.hasCapability(anyString())).thenReturn(false);
- boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen,
extraLen);
+ boolean ret =
+ TraceUtil.trace(() -> BlockIOUtils.preadWithExtra(bb, in, position,
necessaryLen, extraLen),
+ testName.getMethodName());
assertTrue("Expect true return when reading extra bytes succeeds", ret);
verify(in).read(position, buf, bufOffset, totalLen);
verify(in).read(5, buf, 5, 10);
verify(in).hasCapability(anyString());
verifyNoMoreInteractions(in);
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(containsEntry("db.hbase.io.heap_bytes_read",
totalLen))))))));
}
@Test
@@ -357,7 +469,23 @@ public class TestBlockIOUtils {
when(in.hasCapability(anyString())).thenReturn(false);
exception.expect(IOException.class);
exception.expectMessage("EOF");
- BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
+ Span span = TraceUtil.createSpan(testName.getMethodName());
+ try (Scope ignored = span.makeCurrent()) {
+ BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen);
+ span.setStatus(StatusCode.OK);
+ } catch (IOException e) {
+ TraceUtil.setError(span, e);
+ throw e;
+ } finally {
+ span.end();
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<Iterable<SpanData>>(
+ otelRule::getSpans, hasItem(allOf(hasName(testName.getMethodName()),
hasEnded()))));
+ assertThat(otelRule.getSpans(),
+ hasItems(allOf(hasName(testName.getMethodName()),
+
hasEvents(hasItem(allOf(EventMatchers.hasName("BlockIOUtils.preadWithExtra"),
+ hasAttributes(AttributesMatchers.isEmpty())))))));
+ }
}
/**
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 3f09c0ca2a7..35c4736c2fd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -17,12 +17,23 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,21 +41,28 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Category({ IOTests.class, MediumTests.class })
public class TestPrefetch {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPrefetch.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
@@ -61,6 +79,9 @@ public class TestPrefetch {
private FileSystem fs;
private BlockCache blockCache;
+ @Rule
+ public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
+
@Before
public void setUp() throws IOException {
conf = TEST_UTIL.getConfiguration();
@@ -82,8 +103,23 @@ public class TestPrefetch {
@Test
public void testPrefetch() throws Exception {
- Path storeFile = writeStoreFile("TestPrefetch");
- readStoreFile(storeFile);
+ TraceUtil.trace(() -> {
+ Path storeFile = writeStoreFile("TestPrefetch");
+ readStoreFile(storeFile);
+ }, "testPrefetch");
+
+ TEST_UTIL.waitFor(TimeUnit.MINUTES.toMillis(1), new
MatcherPredicate<>(otelRule::getSpans,
+ hasItems(hasName("testPrefetch"), hasName("PrefetchExecutor.request"))));
+ final List<SpanData> spans = otelRule.getSpans();
+ if (LOG.isDebugEnabled()) {
+ StringTraceRenderer renderer = new StringTraceRenderer(spans);
+ renderer.render(LOG::debug);
+ }
+
+ final SpanData testSpan =
spans.stream().filter(hasName("testPrefetch")::matches).findFirst()
+ .orElseThrow(AssertionError::new);
+ assertThat("prefetch spans happen on their own threads, detached from file
open.", spans,
+ hasItem(allOf(hasName("PrefetchExecutor.request"),
not(hasParentSpanId(testSpan)))));
}
@Test