This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch branch-0.12 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.12 by this push: new d6cf16ea7d [ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved readability and maintainability d6cf16ea7d is described below commit d6cf16ea7db3ed0d24c81e6c1fb36407d61145e1 Author: eunhwa99 <68810660+eunhw...@users.noreply.github.com> AuthorDate: Mon Aug 4 17:01:15 2025 +0900 [ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved readability and maintainability ### What is this PR for? This PR refactors the `InfluxDBInterpreter.class` to improve code readability, maintainability, and adherence to modern Java practices, without altering its runtime behavior or core logic. - Key changes include: - Renamed getInfluxDBClient() to better reflect its purpose (e.g., getQueryApi()), improving semantic clarity. - Removed unnecessary code - (e.g., InterpreterContext) from methods where they are unused. - Throwing exceptions from methods like open(), close(), cancel(), getFormType(), and getProgress() where exceptions are not thrown. - Extracted long nested logic blocks in `internalInterpret()` into smaller, well-named private methods. - Replaced imperative loops with Stream operations for collection processing. These changes aim to make the codebase more modular, clean by reducing boilerplate code, and approachable for future contributors and reviewers. ### What type of PR is it? Refactoring ### Todos * [ ] - Task ### What is the Jira issue? * [ZEPPELIN-6264](https://issues.apache.org/jira/browse/ZEPPELIN-6264) ### How should this be tested? * No functional changes; existing tests should pass as-is. ### Screenshots (if appropriate) ### Questions: * Does the license files need to update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Closes #5005 from eunhwa99/ZEPPELIN-6264. Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> (cherry picked from commit ab52456f84b6fe9244a8be3e8b57aeb587b22e5a) Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com> --- .../zeppelin/influxdb/InfluxDBInterpreter.java | 119 ++++++++++++--------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java index bb8e62bd9c..3f718fabda 100644 --- a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java +++ b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java @@ -14,8 +14,8 @@ */ package org.apache.zeppelin.influxdb; +import com.influxdb.query.FluxRecord; import java.util.Properties; -import java.util.StringJoiner; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -24,6 +24,7 @@ import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.InfluxDBClientOptions; import com.influxdb.client.QueryApi; +import java.util.stream.Collectors; import org.apache.zeppelin.interpreter.AbstractInterpreter; import org.apache.zeppelin.interpreter.ZeppelinContext; import org.slf4j.Logger; @@ -83,71 +84,83 @@ public class InfluxDBInterpreter extends AbstractInterpreter { LOGGER.debug("Run Flux command '{}'", query); query = query.trim(); - QueryApi queryService = getInfluxDBClient(context); + QueryApi queryService = getQueryApi(); - final int[] actualIndex = {-1}; + final int[] currentTableIndex = {-1}; AtomicReference<InterpreterResult> resultRef = new AtomicReference<>(); CountDownLatch countDownLatch = new CountDownLatch(1); - StringBuilder result = new StringBuilder(); + StringBuilder resultBuilder = new StringBuilder(); queryService.query( query, //process record - (cancellable, fluxRecord) -> { - - Integer tableIndex = fluxRecord.getTable(); - if (actualIndex[0] != tableIndex) { - result.append(NEWLINE); - result.append(TABLE_MAGIC_TAG); - actualIndex[0] = tableIndex; - - //add column names to table header - StringJoiner joiner = new StringJoiner(TAB); - fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c))); - result.append(joiner.toString()); - result.append(NEWLINE); - } - - StringJoiner rowsJoiner = new StringJoiner(TAB); - for (Object value : fluxRecord.getValues().values()) { - if (value == null) { - value = EMPTY_COLUMN_VALUE; - } - rowsJoiner.add(replaceReservedChars(value.toString())); - } - result.append(rowsJoiner.toString()); - result.append(NEWLINE); - }, - - throwable -> { - - LOGGER.error(throwable.getMessage(), throwable); - resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR, - throwable.getMessage())); - - countDownLatch.countDown(); - - }, () -> { - //on complete - InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS); - intpResult.add(result.toString()); - resultRef.set(intpResult); - countDownLatch.countDown(); - } + (cancellable, fluxRecord) -> handleRecord(fluxRecord, currentTableIndex, resultBuilder), + throwable -> handleError(throwable, resultRef, countDownLatch), + () -> handleComplete(resultBuilder, resultRef, countDownLatch) ); + + awaitLatch(countDownLatch); + + return resultRef.get(); + } + + private void handleRecord(FluxRecord fluxRecord, int[] currentTableIndex, + StringBuilder resultBuilder) { + Integer tableIndex = fluxRecord.getTable(); + if (currentTableIndex[0] != tableIndex) { + appendTableHeader(fluxRecord, resultBuilder); + currentTableIndex[0] = tableIndex; + } + + appendTableRow(fluxRecord, resultBuilder); + } + + private void appendTableHeader(FluxRecord fluxRecord, StringBuilder resultBuilder) { + resultBuilder.append(NEWLINE).append(TABLE_MAGIC_TAG); + String headerLine = fluxRecord.getValues().keySet().stream() + .map(this::replaceReservedChars) + .collect(Collectors.joining(TAB)); + resultBuilder.append(headerLine).append(NEWLINE); + } + + private void appendTableRow(FluxRecord fluxRecord, StringBuilder resultBuilder) { + String rowLine = fluxRecord.getValues().values().stream() + .map(v -> v == null ? EMPTY_COLUMN_VALUE : v.toString()) + .map(this::replaceReservedChars) + .collect(Collectors.joining(TAB)); + resultBuilder.append(rowLine).append(NEWLINE); + } + + private static void handleError(Throwable throwable, AtomicReference<InterpreterResult> resultRef, + CountDownLatch countDownLatch) { + LOGGER.error(throwable.getMessage(), throwable); + resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR, + throwable.getMessage())); + + countDownLatch.countDown(); + } + + private static void handleComplete(StringBuilder resultBuilder, + AtomicReference<InterpreterResult> resultRef, + CountDownLatch countDownLatch) { + InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS); + intpResult.add(resultBuilder.toString()); + resultRef.set(intpResult); + countDownLatch.countDown(); + } + + private static void awaitLatch(CountDownLatch countDownLatch) throws InterpreterException { try { countDownLatch.await(); } catch (InterruptedException e) { throw new InterpreterException(e); } - - return resultRef.get(); } - private QueryApi getInfluxDBClient(InterpreterContext context) { + private QueryApi getQueryApi() { if (queryApi == null) { queryApi = this.client.getQueryApi(); } @@ -156,7 +169,7 @@ public class InfluxDBInterpreter extends AbstractInterpreter { @Override - public void open() throws InterpreterException { + public void open() { if (this.client == null) { InfluxDBClientOptions opt = InfluxDBClientOptions.builder() @@ -172,7 +185,7 @@ public class InfluxDBInterpreter extends AbstractInterpreter { } @Override - public void close() throws InterpreterException { + public void close() { if (this.client != null) { this.client.close(); this.client = null; @@ -180,17 +193,17 @@ public class InfluxDBInterpreter extends AbstractInterpreter { } @Override - public void cancel(InterpreterContext context) throws InterpreterException { + public void cancel(InterpreterContext context) { } @Override - public FormType getFormType() throws InterpreterException { + public FormType getFormType() { return FormType.SIMPLE; } @Override - public int getProgress(InterpreterContext context) throws InterpreterException { + public int getProgress(InterpreterContext context) { return 0; }