npawar commented on code in PR #10528:
URL: https://github.com/apache/pinot/pull/10528#discussion_r1210945646
##########
.github/workflows/pinot_tests.yml:
##########
@@ -120,16 +124,20 @@ jobs:
if: github.repository == 'apache/pinot'
runs-on: ubuntu-latest
strategy:
+ # Changed to false in order to improve coverage using unsafe buffers
+ fail-fast: false
matrix:
testset: [ 1, 2 ]
- name: Pinot Integration Test Set ${{ matrix.testset }}
+ java: [ 11, 17, 20 ]
+ distribution: [ "temurin" ]
+ name: Pinot Integration Test Set ${{ matrix.testset }}
(${{matrix.distribution}}-${{matrix.java}})
Review Comment:
will the github actions end to end now take 6 times as longer than before?
Or would they run in parallel? If the latter, do we know that we have enough
resources on there to handle the increased num tasks?
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java:
##########
@@ -99,6 +104,101 @@ public String toString() {
private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP
= new WeakHashMap<>();
+ /**
+ * Configuration key used to change the offheap buffer factory used by Pinot.
+ * Value should be the qualified path of a class that extends {@link
PinotBufferFactory} and has empty
+ * constructor.
+ */
+ private static final String OFFHEAP_BUFFER_FACTORY_CONFIG =
"pinot.offheap.buffer.factory";
+ /**
+ * Boolean configuration that decides whether to allocate using {@link
ByteBufferPinotBufferFactory} when the buffer
+ * to allocate fits in a {@link ByteBuffer}.
+ *
+ * Defaults to true.
+ */
+ private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG =
"pinot.offheap.prioritize.bytebuffer";
Review Comment:
so this config exists mainly to be able to switch off the behaviour of using
PinotByteBuffer by default for < 2GB, in the newer implementations if we wanted
to?
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java:
##########
@@ -99,6 +104,101 @@ public String toString() {
private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong();
private static final Map<PinotDataBuffer, BufferContext> BUFFER_CONTEXT_MAP
= new WeakHashMap<>();
+ /**
+ * Configuration key used to change the offheap buffer factory used by Pinot.
+ * Value should be the qualified path of a class that extends {@link
PinotBufferFactory} and has empty
+ * constructor.
+ */
+ private static final String OFFHEAP_BUFFER_FACTORY_CONFIG =
"pinot.offheap.buffer.factory";
+ /**
+ * Boolean configuration that decides whether to allocate using {@link
ByteBufferPinotBufferFactory} when the buffer
+ * to allocate fits in a {@link ByteBuffer}.
+ *
+ * Defaults to true.
+ */
+ private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG =
"pinot.offheap.prioritize.bytebuffer";
+
+ /**
+ * The default {@link PinotBufferFactory} used by all threads that do not
define their own factory.
+ */
+ private static PinotBufferFactory _defaultFactory = createDefaultFactory();
+ /**
+ * A thread local variable that can be used to customize the {@link
PinotBufferFactory} used on tests. This is mostly
+ * useful in tests.
+ */
+ private static final ThreadLocal<PinotBufferFactory> _FACTORY = new
ThreadLocal<>();
+
+ /**
+ * Change the {@link PinotBufferFactory} used by the current thread.
+ *
+ * If this method is not called, the default factory configured at startup
time will be used.
+ *
+ * @see #loadDefaultFactory(PinotConfiguration)
+ */
+ public static void useFactory(PinotBufferFactory factory) {
+ _FACTORY.set(factory);
+ }
+
+ /**
+ * Returns the factory the current thread should use.
+ */
+ public static PinotBufferFactory getFactory() {
+ PinotBufferFactory pinotBufferFactory = _FACTORY.get();
+ if (pinotBufferFactory == null) {
+ pinotBufferFactory = _defaultFactory;
+ }
+ return pinotBufferFactory;
+ }
+
+ public static PinotBufferFactory createDefaultFactory() {
+ return createDefaultFactory(true);
+ }
+
+ public static PinotBufferFactory createDefaultFactory(boolean
prioritizeByteBuffer) {
+ String factoryClassName;
+ if (JavaVersion.VERSION < 16) {
+ LOGGER.info("Using LArray as buffer on JVM version {}",
JavaVersion.VERSION);
+ factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName();
+ } else {
+ LOGGER.info("Using Unsafe as buffer on JVM version {}",
JavaVersion.VERSION);
+ factoryClassName = UnsafePinotBufferFactory.class.getCanonicalName();
+ }
+ return createFactory(factoryClassName, prioritizeByteBuffer);
+ }
+
+ private static PinotBufferFactory createFactory(String factoryClassName,
boolean prioritizeByteBuffer) {
+ try {
+ LOGGER.info("Instantiating Pinot buffer factory class {}",
factoryClassName);
+ PinotBufferFactory factory = (PinotBufferFactory)
Class.forName(factoryClassName).getConstructor().newInstance();
+
+ if (prioritizeByteBuffer) {
+ factory = new SmallWithFallbackPinotBufferFactory(new
ByteBufferPinotBufferFactory(), factory);
+ }
+
+ return factory;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Configures the default {@link PinotBufferFactory}.
+ *
+ * This method guarantees that threads that didn't use the factory before
this method is called are going to use the
+ * new factory. In other words, threads that were already running when this
method was called may use other factories.
+ * Therefore it is recommended to call this method during Pinot startup.
+ */
+ public static void loadDefaultFactory(PinotConfiguration configuration) {
+ boolean prioritizeByteBuffer =
configuration.getProperty(OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG, true);
+ String factoryClassName =
configuration.getProperty(OFFHEAP_BUFFER_FACTORY_CONFIG);
+ if (factoryClassName != null) {
+ _defaultFactory = createFactory(factoryClassName, prioritizeByteBuffer);
+ } else {
+ LOGGER.info("No custom Pinot buffer factory class found in
configuration. Using default factory");
+ _defaultFactory = createDefaultFactory();
Review Comment:
don't you want to pass `prioritizeByteBuffer` to createDefaultFactory in
this case?
##########
.github/workflows/scripts/.pinot_test.sh:
##########
@@ -79,7 +83,9 @@ else
-pl '!:pinot-csv' \
-pl '!:pinot-json' \
-pl '!:pinot-segment-uploader-default' \
- -P github-actions,no-integration-tests && exit 0 || exit 1
+ -P github-actions,no-integration-tests \
+ -Dspotless.apply.skip -Dcheckstyle.skip -Dspotless.apply.skip
-Dlicense.skip=true \
Review Comment:
why add skips for all these?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]