This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ebdb03fa4 [GOBBLIN-1817] change some deprecated code and fix minor
codestyle (#3678)
ebdb03fa4 is described below
commit ebdb03fa48238a9cf5b53bc77683e80af8756697
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Apr 14 13:23:00 2023 -0700
[GOBBLIN-1817] change some deprecated code and fix minor codestyle (#3678)
* change some deprecated code and fix minor codestyle
* fix unit tests
---------
Co-authored-by: Arjun <[email protected]>
---
.../gobblin/converter/serde/HiveSerDeConverter.java | 10 ++++++----
.../writer/HiveWritableHdfsDataWriterBuilder.java | 3 ++-
.../java/org/apache/gobblin/hive/HiveSerDeWrapper.java | 17 +++++++++++++----
.../gobblin/yarn/HelixInstancePurgerWithMetrics.java | 4 ++--
.../main/java/org/apache/gobblin/yarn/YarnService.java | 10 +++++-----
5 files changed, 28 insertions(+), 16 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
index 4e696703f..1f23a9462 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/serde/HiveSerDeConverter.java
@@ -23,8 +23,10 @@ import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -69,8 +71,8 @@ import org.apache.gobblin.util.HadoopUtils;
@Slf4j
public class HiveSerDeConverter extends InstrumentedConverter<Object, Object,
Writable, Writable> {
- private SerDe serializer;
- private SerDe deserializer;
+ private Serializer serializer;
+ private Deserializer deserializer;
@Override
public HiveSerDeConverter init(WorkUnitState state) {
@@ -78,8 +80,8 @@ public class HiveSerDeConverter extends
InstrumentedConverter<Object, Object, Wr
Configuration conf = HadoopUtils.getConfFromState(state);
try {
- this.serializer = HiveSerDeWrapper.getSerializer(state).getSerDe();
- this.deserializer = HiveSerDeWrapper.getDeserializer(state).getSerDe();
+ this.serializer = (Serializer)
HiveSerDeWrapper.getSerializer(state).getSerDe();
+ this.deserializer = (Deserializer)
HiveSerDeWrapper.getDeserializer(state).getSerDe();
this.deserializer.initialize(conf, state.getProperties());
setColumnsIfPossible(state);
this.serializer.initialize(conf, state.getProperties());
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
index 5dba8617a..6b056848e 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/HiveWritableHdfsDataWriterBuilder.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.io.Writable;
import com.google.common.base.Preconditions;
@@ -54,7 +55,7 @@ public class HiveWritableHdfsDataWriterBuilder<S> extends
FsDataWriterBuilder<S,
if (!properties.contains(WRITER_WRITABLE_CLASS) ||
!properties.contains(WRITER_OUTPUT_FORMAT_CLASS)) {
HiveSerDeWrapper serializer = HiveSerDeWrapper.getSerializer(properties);
- properties.setProp(WRITER_WRITABLE_CLASS,
serializer.getSerDe().getSerializedClass().getName());
+ properties.setProp(WRITER_WRITABLE_CLASS, ((Serializer)
serializer.getSerDe()).getSerializedClass().getName());
properties.setProp(WRITER_OUTPUT_FORMAT_CLASS,
serializer.getOutputFormatClassName());
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
index fea2e99d1..f01faa36f 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeWrapper.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.hive;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -39,6 +41,7 @@ import com.google.common.base.Preconditions;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.Either;
/**
@@ -88,7 +91,7 @@ public class HiveSerDeWrapper {
}
}
- private Optional<SerDe> serDe = Optional.absent();
+ private Optional<Either<AbstractSerDe, VectorizedSerde>> serDe =
Optional.absent();
private final String serDeClassName;
private final String inputFormatClassName;
private final String outputFormatClassName;
@@ -107,15 +110,21 @@ public class HiveSerDeWrapper {
* Get the {@link SerDe} instance associated with this {@link
HiveSerDeWrapper}.
* This method performs lazy initialization.
*/
- public SerDe getSerDe() throws IOException {
+ public Object getSerDe() throws IOException {
if (!this.serDe.isPresent()) {
try {
- this.serDe =
Optional.of(SerDe.class.cast(Class.forName(this.serDeClassName).newInstance()));
+ Object serde = Class.forName(this.serDeClassName).newInstance();
+ if (serde instanceof OrcSerde) {
+ this.serDe =
Optional.of(Either.right(VectorizedSerde.class.cast(serde)));
+ } else {
+ this.serDe =
Optional.of(Either.left(AbstractSerDe.class.cast(serde)));
+ }
} catch (Throwable t) {
throw new IOException("Failed to instantiate SerDe " +
this.serDeClassName, t);
}
}
- return this.serDe.get();
+
+ return this.serDe.get().get();
}
/**
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
index efb6644b4..9ac0ccad7 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.java
@@ -42,9 +42,9 @@ public class HelixInstancePurgerWithMetrics {
/**
- * Blocking call for purging all offline helix instances. Provides boiler
plate code for providing periodic updates
+ * Blocking call for purging all offline helix instances. Provides
boilerplate code for providing periodic updates
* and sending a GTE if it's an unexpected amount of time.
- *
+ * <p>
* All previous helix instances should be purged on startup. Gobblin task
runners are stateless from helix
* perspective because all important state is persisted separately in
Workunit State Store or Watermark store.
*/
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 7955c791a..5b6610f81 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -333,7 +333,7 @@ public class YarnService extends AbstractIdleService {
// Record that this container was explicitly released so that a new one
is not spawned to replace it
// Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
// can check for the container id and skip spawning a replacement
container.
- // Note that this is best effort since these are asynchronous operations
and a container may abort concurrently
+ // Note that this is the best effort since these are asynchronous
operations and a container may abort concurrently
// with the release call. So in some cases a replacement container may
have already been spawned before
// the container is put into the black list.
this.releasedContainerCache.put(container.getId(), "");
@@ -453,7 +453,7 @@ public class YarnService extends AbstractIdleService {
/**
* Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
* of containers then additional containers are requested.
- *
+ * <p>
* If numTargetContainers is less than the current number of allocated
containers then release free containers.
* Shrinking is relative to the number of currently allocated containers
since it takes time for containers
* to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
@@ -709,7 +709,7 @@ public class YarnService extends AbstractIdleService {
* Handle the completion of a container. A new container will be requested
to replace the one
* that just exited. Depending on the exit status and if container host
affinity is enabled,
* the new container may or may not try to be started on the same node.
- *
+ * <p>
* A container completes in either of the following conditions: 1) some
error happens in the
* container and caused the container to exit, 2) the container gets killed
due to some reason,
* for example, if it runs over the allowed amount of virtual or physical
memory, 3) the gets
@@ -860,11 +860,11 @@ public class YarnService extends AbstractIdleService {
* Get the number of matching container requests for the specified resource
memory and cores.
* Due to YARN-1902 and YARN-660, this API is not 100% accurate. {@link
AMRMClientCallbackHandler#onContainersAllocated(List)}
* contains logic for best effort clean up of requests, and the resource
tend to match the allocated container. So in practice the count is pretty
accurate.
- *
+ * <p>
* This API call gets the count of container requests for containers that
are > resource if there is no request with the exact same resource
* The RM can return containers that are larger (because of normalization
etc).
* Container may be larger by memory or cpu (e.g. container (1000M, 3cpu)
can fit request (1000M, 1cpu) or request (500M, 3cpu).
- *
+ * <p>
* Thankfully since each helix tag / resource has a different priority,
matching requests for one helix tag / resource
* have complete isolation from another helix tag / resource
*/