dengzhhu653 commented on a change in pull request #2693:
URL: https://github.com/apache/hive/pull/2693#discussion_r752141014
##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
##########
@@ -18,82 +18,66 @@
package org.apache.hadoop.hive.ql.exec.mr;
+import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hive.ql.metadata.HiveException;
/**
- * ObjectCache. No-op implementation on MR we don't have a means to reuse
- * Objects between runs of the same task.
+ * ObjectCache. Simple implementation on MR we don't have a means to reuse
+ * Objects between runs of the same task, this acts as a local cache.
*
*/
public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache
{
private static final Logger LOG =
LoggerFactory.getLogger(ObjectCache.class.getName());
+ private final Map<String, Object> cache = new ConcurrentHashMap<>();
+
+ private static ExecutorService staticPool = Executors.newCachedThreadPool();
+
@Override
public void release(String key) {
// nothing to do
LOG.debug("{} no longer needed", key);
+ cache.remove(key);
}
@Override
public <T> T retrieve(String key) throws HiveException {
- return retrieve(key, null);
+ return (T) cache.get(key);
}
@Override
public <T> T retrieve(String key, Callable<T> fn) throws HiveException {
+ T value = (T) cache.get(key);
+ if (value != null || fn == null) {
+ return value;
+ }
try {
LOG.debug("Creating {}", key);
- return fn.call();
+ value = fn.call();
} catch (Exception e) {
throw new HiveException(e);
}
+ T previous = (T) cache.putIfAbsent(key, value);
+ return previous != null ? previous : value;
}
@Override
public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws
HiveException {
- final T value = retrieve(key, fn);
-
- return new Future<T>() {
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public boolean isDone() {
- return true;
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return value;
- }
-
- @Override
- public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException,
- TimeoutException {
- return value;
- }
- };
+ return staticPool.submit((Callable)() -> retrieve(key, fn));
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]