Repository: beam Updated Branches: refs/heads/master 17bc3b140 -> 440c7d45b
Simplified ByteBuddyOnTimerInvokerFactory Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8d98336 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8d98336 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8d98336 Branch: refs/heads/master Commit: c8d983363efd3f3d93825ecc8e8abae2dfa4e008 Parents: 17bc3b1 Author: Innocent Djiofack <[email protected]> Authored: Wed Jun 28 22:15:11 2017 -0400 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jul 6 21:46:53 2017 -0700 ---------------------------------------------------------------------- .../reflect/ByteBuddyOnTimerInvokerFactory.java | 73 ++++++++------------ .../reflect/OnTimerMethodSpecifier.java | 37 ++++++++++ 2 files changed, 65 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java index e031337..5e31f2e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms.reflect; + import com.google.common.base.CharMatcher; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -61,13 +62,14 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { @SuppressWarnings("unchecked") Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass(); - try { - Constructor<?> constructor = constructorCache.get(fnClass).get(timerId); - @SuppressWarnings("unchecked") - OnTimerInvoker<InputT, OutputT> invoker = + OnTimerMethodSpecifier onTimerMethodSpecifier = + OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId); + Constructor<?> constructor = constructorCache.get(onTimerMethodSpecifier); + + OnTimerInvoker<InputT, OutputT> invoker = (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn); - return invoker; + return invoker; } catch (InstantiationException | IllegalAccessException | IllegalArgumentException @@ -97,50 +99,31 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory { private static final String FN_DELEGATE_FIELD_NAME = "delegate"; /** - * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn} - * class and then by {@link TimerId}. + * A cache of constructors of generated {@link OnTimerInvoker} classes, + * keyed by {@link OnTimerMethodSpecifier}. * * <p>Needed because generating an invoker class is expensive, and to avoid generating an * excessive number of classes consuming PermGen memory in Java's that still have PermGen. */ - private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>> - constructorCache = - CacheBuilder.newBuilder() - .build( - new CacheLoader< - Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() { - @Override - public LoadingCache<String, Constructor<?>> load( - final Class<? extends DoFn<?, ?>> fnClass) throws Exception { - return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass)); - } - }); - - /** - * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the - * invokers for its {@link OnTimer @OnTimer} methods. - */ - private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> { - - private final DoFnSignature signature; - - public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) { - this.signature = DoFnSignatures.getSignature(clazz); - } - - @Override - public Constructor<?> load(String timerId) throws Exception { - Class<? extends OnTimerInvoker<?, ?>> invokerClass = - generateOnTimerInvokerClass(signature, timerId); - try { - return invokerClass.getConstructor(signature.fnClass()); - } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { - throw new RuntimeException(e); - } - } - } - - /** + private final LoadingCache<OnTimerMethodSpecifier, Constructor<?>> constructorCache = + CacheBuilder.newBuilder().build( + new CacheLoader<OnTimerMethodSpecifier, Constructor<?>>() { + @Override + public Constructor<?> load(final OnTimerMethodSpecifier onTimerMethodSpecifier) + throws Exception { + DoFnSignature signature = + DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass()); + Class<? extends OnTimerInvoker<?, ?>> invokerClass = + generateOnTimerInvokerClass(signature, onTimerMethodSpecifier.timerId()); + try { + return invokerClass.getConstructor(signature.fnClass()); + } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + + } + }); + /** * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link * TimerId}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/c8d98336/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java new file mode 100644 index 0000000..edf7e3c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.reflect; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Used by {@link ByteBuddyOnTimerInvokerFactory} to Dynamically generate + * {@link OnTimerInvoker} instances for invoking a particular + * {@link DoFn.TimerId} on a particular {@link DoFn}. + */ + +@AutoValue +abstract class OnTimerMethodSpecifier { + public abstract Class<? extends DoFn<?, ?>> fnClass(); + public abstract String timerId(); + public static OnTimerMethodSpecifier + forClassAndTimerId(Class<? extends DoFn<?, ?>> fnClass, String timerId){ + return new AutoValue_OnTimerMethodSpecifier(fnClass, timerId); + } +}
