This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ebdb03fa4 [GOBBLIN-1817] change some deprecated code and fix minor 
codestyle (#3678)
ebdb03fa4 is described below

commit ebdb03fa48238a9cf5b53bc77683e80af8756697
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Apr 14 13:23:00 2023 -0700

    [GOBBLIN-1817] change some deprecated code and fix minor codestyle (#3678)
    
    * change some deprecated code and fix minor codestyle
    
    * fix unit tests
    
    ---------
    
    Co-authored-by: Arjun <[email protected]>
---
 .../gobblin/converter/serde/HiveSerDeConverter.java     | 10 ++++++----
 .../writer/HiveWritableHdfsDataWriterBuilder.java       |  3 ++-
 .../java/org/apache/gobblin/hive/HiveSerDeWrapper.java  | 17 +++++++++++++----
 .../gobblin/yarn/HelixInstancePurgerWithMetrics.java    |  4 ++--
 .../main/java/org/apache/gobblin/yarn/YarnService.java  | 10 +++++-----
 5 files changed, 28 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
index 4e696703f..1f23a9462 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
@@ -23,8 +23,10 @@ import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -69,8 +71,8 @@ import org.apache.gobblin.util.HadoopUtils;
 @Slf4j
 public class HiveSerDeConverter extends InstrumentedConverter<Object, Object, 
Writable, Writable> {
 
-  private SerDe serializer;
-  private SerDe deserializer;
+  private Serializer serializer;
+  private Deserializer deserializer;
 
   @Override
   public HiveSerDeConverter init(WorkUnitState state) {
@@ -78,8 +80,8 @@ public class HiveSerDeConverter extends 
InstrumentedConverter<Object, Object, Wr
     Configuration conf = HadoopUtils.getConfFromState(state);
 
     try {
-      this.serializer = HiveSerDeWrapper.getSerializer(state).getSerDe();
-      this.deserializer = HiveSerDeWrapper.getDeserializer(state).getSerDe();
+      this.serializer = (Serializer) 
HiveSerDeWrapper.getSerializer(state).getSerDe();
+      this.deserializer = (Deserializer) 
HiveSerDeWrapper.getDeserializer(state).getSerDe();
       this.deserializer.initialize(conf, state.getProperties());
       setColumnsIfPossible(state);
       this.serializer.initialize(conf, state.getProperties());
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
index 5dba8617a..6b056848e 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.writer;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Preconditions;
@@ -54,7 +55,7 @@ public class HiveWritableHdfsDataWriterBuilder<S> extends 
FsDataWriterBuilder<S,
 
     if (!properties.contains(WRITER_WRITABLE_CLASS) || 
!properties.contains(WRITER_OUTPUT_FORMAT_CLASS)) {
       HiveSerDeWrapper serializer = HiveSerDeWrapper.getSerializer(properties);
-      properties.setProp(WRITER_WRITABLE_CLASS, 
serializer.getSerDe().getSerializedClass().getName());
+      properties.setProp(WRITER_WRITABLE_CLASS, ((Serializer) 
serializer.getSerDe()).getSerializedClass().getName());
       properties.setProp(WRITER_OUTPUT_FORMAT_CLASS, 
serializer.getOutputFormatClassName());
     }
 
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
index fea2e99d1..f01faa36f 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.hive;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -39,6 +41,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.Either;
 
 
 /**
@@ -88,7 +91,7 @@ public class HiveSerDeWrapper {
     }
   }
 
-  private Optional<SerDe> serDe = Optional.absent();
+  private Optional<Either<AbstractSerDe, VectorizedSerde>> serDe = 
Optional.absent();
   private final String serDeClassName;
   private final String inputFormatClassName;
   private final String outputFormatClassName;
@@ -107,15 +110,21 @@ public class HiveSerDeWrapper {
    * Get the {@link SerDe} instance associated with this {@link 
HiveSerDeWrapper}.
    * This method performs lazy initialization.
    */
-  public SerDe getSerDe() throws IOException {
+  public Object getSerDe() throws IOException {
     if (!this.serDe.isPresent()) {
       try {
-        this.serDe = 
Optional.of(SerDe.class.cast(Class.forName(this.serDeClassName).newInstance()));
+        Object serde = Class.forName(this.serDeClassName).newInstance();
+        if (serde instanceof OrcSerde) {
+          this.serDe = 
Optional.of(Either.right(VectorizedSerde.class.cast(serde)));
+        } else {
+          this.serDe = 
Optional.of(Either.left(AbstractSerDe.class.cast(serde)));
+        }
       } catch (Throwable t) {
         throw new IOException("Failed to instantiate SerDe " + 
this.serDeClassName, t);
       }
     }
-    return this.serDe.get();
+
+    return this.serDe.get().get();
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
index efb6644b4..9ac0ccad7 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
@@ -42,9 +42,9 @@ public class HelixInstancePurgerWithMetrics {
 
 
   /**
-   * Blocking call for purging all offline helix instances. Provides boiler 
plate code for providing periodic updates
+   * Blocking call for purging all offline helix instances. Provides 
boilerplate code for providing periodic updates
    * and sending a GTE if it's an unexpected amount of time.
-   *
+   * <p>
    * All previous helix instances should be purged on startup. Gobblin task 
runners are stateless from helix
    * perspective because all important state is persisted separately in 
Workunit State Store or Watermark store.
    */
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 7955c791a..5b6610f81 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -333,7 +333,7 @@ public class YarnService extends AbstractIdleService {
       // Record that this container was explicitly released so that a new one 
is not spawned to replace it
       // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
       // can check for the container id and skip spawning a replacement 
container.
-      // Note that this is best effort since these are asynchronous operations 
and a container may abort concurrently
+      // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
       // with the release call. So in some cases a replacement container may 
have already been spawned before
       // the container is put into the black list.
       this.releasedContainerCache.put(container.getId(), "");
@@ -453,7 +453,7 @@ public class YarnService extends AbstractIdleService {
   /**
    * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
    * of containers then additional containers are requested.
-   *
+   * <p>
    * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
    * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
    * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
@@ -709,7 +709,7 @@ public class YarnService extends AbstractIdleService {
    * Handle the completion of a container. A new container will be requested 
to replace the one
    * that just exited. Depending on the exit status and if container host 
affinity is enabled,
    * the new container may or may not try to be started on the same node.
-   *
+   * <p>
    * A container completes in either of the following conditions: 1) some 
error happens in the
    * container and caused the container to exit, 2) the container gets killed 
due to some reason,
    * for example, if it runs over the allowed amount of virtual or physical 
memory, 3) the gets
@@ -860,11 +860,11 @@ public class YarnService extends AbstractIdleService {
    * Get the number of matching container requests for the specified resource 
memory and cores.
    * Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link 
AMRMClientCallbackHandler#onContainersAllocated(List)}
    * contains logic for best effort clean up of requests, and the resource 
tend to match the allocated container. So in practice the count is pretty 
accurate.
-   *
+   * <p>
    * This API call gets the count of container requests for containers that 
are > resource if there is no request with the exact same resource
    * The RM can return containers that are larger (because of normalization 
etc).
    * Container may be larger by memory or cpu (e.g. container (1000M, 3cpu) 
can fit request (1000M, 1cpu) or request (500M, 3cpu).
-   *
+   * <p>
    * Thankfully since each helix tag / resource has a different priority, 
matching requests for one helix tag / resource
    * have complete isolation from another helix tag / resource
    */

Reply via email to