This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 62331b6f9e API: Remove overflow checks in DefaultCounter causing
performance issues (#8297)
62331b6f9e is described below
commit 62331b6f9edea5bd42752882f980f6768bb69d16
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Aug 14 11:37:44 2023 -0700
API: Remove overflow checks in DefaultCounter causing performance issues
(#8297)
---
.../java/org/apache/iceberg/metrics/Counter.java | 4 ++
.../org/apache/iceberg/metrics/DefaultCounter.java | 8 ++-
.../apache/iceberg/metrics/TestDefaultCounter.java | 10 ---
.../iceberg/metrics/TestDefaultMetricsContext.java | 16 +----
.../apache/iceberg/metrics/CountersBenchmark.java | 82 ++++++++++++++++++++++
5 files changed, 93 insertions(+), 27 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/metrics/Counter.java
b/api/src/main/java/org/apache/iceberg/metrics/Counter.java
index e8d1a899fe..f984c608de 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/Counter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/Counter.java
@@ -29,6 +29,8 @@ public interface Counter {
/**
* Increment the counter by the provided amount.
*
+ * <p>Implementations may skip the overflow check for better write
throughput.
+ *
* @param amount to be incremented.
*/
default void increment(int amount) {
@@ -38,6 +40,8 @@ public interface Counter {
/**
* Increment the counter by the provided amount.
*
+ * <p>Implementations may skip the overflow check for better write
throughput.
+ *
* @param amount to be incremented.
*/
void increment(long amount);
diff --git a/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
b/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
index 9bb23092a8..fad9bf8c57 100644
--- a/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
+++ b/api/src/main/java/org/apache/iceberg/metrics/DefaultCounter.java
@@ -63,7 +63,6 @@ public class DefaultCounter implements Counter {
@Override
public void increment(long amount) {
- Math.addExact(counter.longValue(), amount);
counter.add(amount);
}
@@ -107,7 +106,6 @@ public class DefaultCounter implements Counter {
@Override
public void increment(Integer amount) {
- Math.addExact(counter.intValue(), amount);
DefaultCounter.this.increment(amount);
}
@@ -118,7 +116,11 @@ public class DefaultCounter implements Counter {
@Override
public Integer value() {
- return counter.intValue();
+ long value = counter.longValue();
+ if (value > Integer.MAX_VALUE) {
+ throw new ArithmeticException("integer overflow");
+ }
+ return (int) value;
}
@Override
diff --git
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
index 5f62e20ab4..f77da34dca 100644
--- a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
+++ b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultCounter.java
@@ -49,14 +49,4 @@ public class TestDefaultCounter {
Assertions.assertThat(counter.unit()).isEqualTo(MetricsContext.Unit.BYTES);
Assertions.assertThat(counter.isNoop()).isFalse();
}
-
- @Test
- public void counterOverflow() {
- Counter counter = new DefaultCounter(MetricsContext.Unit.COUNT);
- counter.increment(Long.MAX_VALUE);
- Assertions.assertThatThrownBy(counter::increment)
- .isInstanceOf(ArithmeticException.class)
- .hasMessage("long overflow");
- Assertions.assertThat(counter.value()).isEqualTo(Long.MAX_VALUE);
- }
}
diff --git
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
index 86c216119e..3819485040 100644
---
a/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
+++
b/api/src/test/java/org/apache/iceberg/metrics/TestDefaultMetricsContext.java
@@ -60,10 +60,10 @@ public class TestDefaultMetricsContext {
MetricsContext.Counter<Integer> counter =
metricsContext.counter("test", Integer.class,
MetricsContext.Unit.COUNT);
counter.increment(Integer.MAX_VALUE);
- Assertions.assertThatThrownBy(counter::increment)
+ counter.increment();
+ Assertions.assertThatThrownBy(counter::value)
.isInstanceOf(ArithmeticException.class)
.hasMessage("integer overflow");
- Assertions.assertThat(counter.value()).isEqualTo(Integer.MAX_VALUE);
}
@Test
@@ -84,18 +84,6 @@ public class TestDefaultMetricsContext {
Assertions.assertThat(counter.unit()).isEqualTo(MetricsContext.Unit.COUNT);
}
- @Test
- public void longCounterOverflow() {
- MetricsContext metricsContext = new DefaultMetricsContext();
- MetricsContext.Counter<Long> counter =
- metricsContext.counter("test", Long.class, MetricsContext.Unit.COUNT);
- counter.increment(Long.MAX_VALUE);
- Assertions.assertThatThrownBy(counter::increment)
- .isInstanceOf(ArithmeticException.class)
- .hasMessage("long overflow");
- Assertions.assertThat(counter.value()).isEqualTo(Long.MAX_VALUE);
- }
-
@Test
public void timer() {
MetricsContext metricsContext = new DefaultMetricsContext();
diff --git
a/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java
b/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java
new file mode 100644
index 0000000000..f9a5a46cf3
--- /dev/null
+++ b/core/src/jmh/java/org/apache/iceberg/metrics/CountersBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.metrics.MetricsContext.Unit;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.infra.Blackhole;
+
+@Fork(1)
+@State(Scope.Benchmark)
+@Measurement(iterations = 25)
+@BenchmarkMode(Mode.SingleShotTime)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class CountersBenchmark {
+
+ private static final int NUM_OPERATIONS = 10_000_000;
+ private static final int WORKER_POOL_SIZE = 16;
+ private static final int INCREMENT_AMOUNT = 10_000;
+
+ @Benchmark
+ @Threads(1)
+ public void defaultCounterMultipleThreads(Blackhole blackhole) {
+ Counter counter = new DefaultCounter(Unit.BYTES);
+
+ ExecutorService workerPool = ThreadPools.newWorkerPool("bench-pool",
WORKER_POOL_SIZE);
+
+ try {
+ Tasks.range(WORKER_POOL_SIZE)
+ .executeWith(workerPool)
+ .run(
+ (id) -> {
+ for (int operation = 0; operation < NUM_OPERATIONS;
operation++) {
+ counter.increment(INCREMENT_AMOUNT);
+ }
+ });
+ } finally {
+ workerPool.shutdown();
+ }
+
+ blackhole.consume(counter);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void defaultCounterSingleThread(Blackhole blackhole) {
+ Counter counter = new DefaultCounter(Unit.BYTES);
+
+ for (int operation = 0; operation < WORKER_POOL_SIZE * NUM_OPERATIONS;
operation++) {
+ counter.increment(INCREMENT_AMOUNT);
+ }
+
+ blackhole.consume(counter);
+ }
+}