This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.12
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.12 by this push:
     new d6cf16ea7d [ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved 
readability and maintainability
d6cf16ea7d is described below

commit d6cf16ea7db3ed0d24c81e6c1fb36407d61145e1
Author: eunhwa99 <68810660+eunhw...@users.noreply.github.com>
AuthorDate: Mon Aug 4 17:01:15 2025 +0900

    [ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved readability and 
maintainability
    
    ### What is this PR for?
    This PR refactors the `InfluxDBInterpreter.class` to improve code 
readability, maintainability, and adherence to modern Java practices, without 
altering its runtime behavior or core logic.
    
    - Key changes include:
      - Renamed getInfluxDBClient() to better reflect its purpose (e.g., 
getQueryApi()), improving semantic clarity.
      - Removed unnecessary code
         - (e.g., InterpreterContext) from methods where they are unused.
         - Throwing exceptions from methods like open(), close(), cancel(), 
getFormType(), and getProgress() where exceptions are not thrown.
      - Extracted long nested logic blocks in `internalInterpret()` into 
smaller, well-named private methods.
      - Replaced imperative loops with Stream operations for collection 
processing.
    
    These changes aim to make the codebase more modular, clean by reducing 
boilerplate code, and approachable for future contributors and reviewers.
    
    ### What type of PR is it?
    Refactoring
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * [ZEPPELIN-6264](https://issues.apache.org/jira/browse/ZEPPELIN-6264)
    
    ### How should this be tested?
    * No functional changes; existing tests should pass as-is.
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the license files need to update? no
    * Is there breaking changes for older versions? no
    * Does this needs documentation? no
    
    Closes #5005 from eunhwa99/ZEPPELIN-6264.
    
    Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com>
    (cherry picked from commit ab52456f84b6fe9244a8be3e8b57aeb587b22e5a)
    Signed-off-by: Philipp Dallig <philipp.dal...@gmail.com>
---
 .../zeppelin/influxdb/InfluxDBInterpreter.java     | 119 ++++++++++++---------
 1 file changed, 66 insertions(+), 53 deletions(-)

diff --git 
a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java 
b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
index bb8e62bd9c..3f718fabda 100644
--- 
a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
+++ 
b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java
@@ -14,8 +14,8 @@
  */
 package org.apache.zeppelin.influxdb;
 
+import com.influxdb.query.FluxRecord;
 import java.util.Properties;
-import java.util.StringJoiner;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -24,6 +24,7 @@ import com.influxdb.client.InfluxDBClient;
 import com.influxdb.client.InfluxDBClientFactory;
 import com.influxdb.client.InfluxDBClientOptions;
 import com.influxdb.client.QueryApi;
+import java.util.stream.Collectors;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
 import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.slf4j.Logger;
@@ -83,71 +84,83 @@ public class InfluxDBInterpreter extends 
AbstractInterpreter {
     LOGGER.debug("Run Flux command '{}'", query);
     query = query.trim();
 
-    QueryApi queryService = getInfluxDBClient(context);
+    QueryApi queryService = getQueryApi();
 
-    final int[] actualIndex = {-1};
+    final int[] currentTableIndex = {-1};
 
     AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
     CountDownLatch countDownLatch = new CountDownLatch(1);
 
-    StringBuilder result = new StringBuilder();
+    StringBuilder resultBuilder = new StringBuilder();
     queryService.query(
         query,
 
         //process record
-        (cancellable, fluxRecord) -> {
-
-          Integer tableIndex = fluxRecord.getTable();
-          if (actualIndex[0] != tableIndex) {
-            result.append(NEWLINE);
-            result.append(TABLE_MAGIC_TAG);
-            actualIndex[0] = tableIndex;
-
-            //add column names to table header
-            StringJoiner joiner = new StringJoiner(TAB);
-            fluxRecord.getValues().keySet().forEach(c -> 
joiner.add(replaceReservedChars(c)));
-            result.append(joiner.toString());
-            result.append(NEWLINE);
-          }
-
-          StringJoiner rowsJoiner = new StringJoiner(TAB);
-          for (Object value : fluxRecord.getValues().values()) {
-            if (value == null) {
-              value = EMPTY_COLUMN_VALUE;
-            }
-            rowsJoiner.add(replaceReservedChars(value.toString()));
-          }
-          result.append(rowsJoiner.toString());
-          result.append(NEWLINE);
-        },
-
-        throwable -> {
-
-          LOGGER.error(throwable.getMessage(), throwable);
-          resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
-              throwable.getMessage()));
-
-          countDownLatch.countDown();
-
-        }, () -> {
-          //on complete
-          InterpreterResult intpResult = new 
InterpreterResult(InterpreterResult.Code.SUCCESS);
-          intpResult.add(result.toString());
-          resultRef.set(intpResult);
-          countDownLatch.countDown();
-        }
+        (cancellable, fluxRecord) -> handleRecord(fluxRecord, 
currentTableIndex, resultBuilder),
+        throwable -> handleError(throwable, resultRef, countDownLatch),
+        () -> handleComplete(resultBuilder, resultRef, countDownLatch)
     );
+
+    awaitLatch(countDownLatch);
+
+    return resultRef.get();
+  }
+
+  private void handleRecord(FluxRecord fluxRecord, int[] currentTableIndex,
+      StringBuilder resultBuilder) {
+    Integer tableIndex = fluxRecord.getTable();
+    if (currentTableIndex[0] != tableIndex) {
+      appendTableHeader(fluxRecord, resultBuilder);
+      currentTableIndex[0] = tableIndex;
+    }
+
+    appendTableRow(fluxRecord, resultBuilder);
+  }
+
+  private void appendTableHeader(FluxRecord fluxRecord, StringBuilder 
resultBuilder) {
+    resultBuilder.append(NEWLINE).append(TABLE_MAGIC_TAG);
+    String headerLine = fluxRecord.getValues().keySet().stream()
+        .map(this::replaceReservedChars)
+        .collect(Collectors.joining(TAB));
+    resultBuilder.append(headerLine).append(NEWLINE);
+  }
+
+  private void appendTableRow(FluxRecord fluxRecord, StringBuilder 
resultBuilder) {
+    String rowLine = fluxRecord.getValues().values().stream()
+        .map(v -> v == null ? EMPTY_COLUMN_VALUE : v.toString())
+        .map(this::replaceReservedChars)
+        .collect(Collectors.joining(TAB));
+    resultBuilder.append(rowLine).append(NEWLINE);
+  }
+
+  private static void handleError(Throwable throwable, 
AtomicReference<InterpreterResult> resultRef,
+      CountDownLatch countDownLatch) {
+    LOGGER.error(throwable.getMessage(), throwable);
+    resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
+        throwable.getMessage()));
+
+    countDownLatch.countDown();
+  }
+
+  private static void handleComplete(StringBuilder resultBuilder,
+      AtomicReference<InterpreterResult> resultRef,
+      CountDownLatch countDownLatch) {
+    InterpreterResult intpResult = new 
InterpreterResult(InterpreterResult.Code.SUCCESS);
+    intpResult.add(resultBuilder.toString());
+    resultRef.set(intpResult);
+    countDownLatch.countDown();
+  }
+
+  private static void awaitLatch(CountDownLatch countDownLatch) throws 
InterpreterException {
     try {
       countDownLatch.await();
     } catch (InterruptedException e) {
       throw new InterpreterException(e);
     }
-
-    return resultRef.get();
   }
 
 
-  private QueryApi getInfluxDBClient(InterpreterContext context) {
+  private QueryApi getQueryApi() {
     if (queryApi == null) {
       queryApi = this.client.getQueryApi();
     }
@@ -156,7 +169,7 @@ public class InfluxDBInterpreter extends 
AbstractInterpreter {
 
 
   @Override
-  public void open() throws InterpreterException {
+  public void open() {
 
     if (this.client == null) {
       InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
@@ -172,7 +185,7 @@ public class InfluxDBInterpreter extends 
AbstractInterpreter {
   }
 
   @Override
-  public void close() throws InterpreterException {
+  public void close() {
     if (this.client != null) {
       this.client.close();
       this.client = null;
@@ -180,17 +193,17 @@ public class InfluxDBInterpreter extends 
AbstractInterpreter {
   }
 
   @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
+  public void cancel(InterpreterContext context) {
 
   }
 
   @Override
-  public FormType getFormType() throws InterpreterException {
+  public FormType getFormType() {
     return FormType.SIMPLE;
   }
 
   @Override
-  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+  public int getProgress(InterpreterContext context) {
     return 0;
   }
 

Reply via email to