lukecwik commented on code in PR #22162:
URL: https://github.com/apache/beam/pull/22162#discussion_r916070060
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java:
##########
@@ -1246,4 +1263,112 @@ public void testGetOptionNameFromMethod() throws
NoSuchMethodException {
handler.as(BaseOptions.class);
assertEquals("foo",
handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
}
+
+ private DynamicType.Unloaded<? extends PipelineOptions> spinNewInterface(int
methodNumber) {
+ return new ByteBuddy()
+ .makeInterface(PipelineOptions.class)
+ .defineMethod("getDynamicMethod" + methodNumber, String.class,
Visibility.PUBLIC)
+ .withoutCode()
+ .defineMethod("setDynamicMethod" + methodNumber, Void.TYPE,
Visibility.PUBLIC)
+ .withParameters(String.class)
+ .withoutCode()
+ .make();
+ }
+
+ private Map<Integer, Class<? extends PipelineOptions>> loadAllInterfaces(
+ int numInterfaces, ClassLoader classLoader) {
+ List<DynamicType.Unloaded<? extends PipelineOptions>> dynamicInterfaces =
+ IntStream.range(0, numInterfaces)
+ .mapToObj(this::spinNewInterface)
+ .collect(Collectors.toList());
+
+ DynamicType.Loaded<Object> root =
+ new
ByteBuddy().subclass(Object.class).make().include(dynamicInterfaces).load(classLoader);
+
+ Map<TypeDescription, Class<?>> loadedInterfaces =
root.getLoadedAuxiliaryTypes();
+ Map<Integer, Class<? extends PipelineOptions>> result = Maps.newHashMap();
+
+ IntStream.range(0, numInterfaces)
+ .forEach(
+ i -> {
+ DynamicType.Unloaded<? extends PipelineOptions> iface =
dynamicInterfaces.get(i);
+ Class<?> clazz =
loadedInterfaces.get(iface.getTypeDescription());
+ result.put(i, (Class<? extends PipelineOptions>) clazz);
+ });
+
+ return result;
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int numInterfaces = 100;
+ int numWorkers = 10;
+ int numReaders = 10;
+ int step = numInterfaces / numWorkers;
+
+ ProxyInvocationHandler handler = new
ProxyInvocationHandler(Maps.newHashMap());
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Map<Integer, Class<? extends PipelineOptions>> ifaces =
loadAllInterfaces(numInterfaces, cl);
+ CountDownLatch startWaiter = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(numWorkers);
+
+ ExecutorService executor = Executors.newFixedThreadPool(numWorkers +
numReaders);
+ List<Future<?>> futs = Lists.newArrayList();
+
+ // launch a `numWorkers` concurrent "writers" that will try to cast the
proxy handler to various
+ // interfaces and set a value on them.
+ for (int start = 0; start < numInterfaces; start += step) {
+ final int s = start;
+ final int end = start + step;
+ Callable<Void> worker =
+ () -> {
+ startWaiter.await();
+ for (int i = s; i < end; i++) {
+ Class<? extends PipelineOptions> iface = ifaces.get(i);
+ Method setter = iface.getDeclaredMethod("setDynamicMethod" + i,
String.class);
+ PipelineOptions opt1 = handler.as(iface);
+ setter.invoke(opt1, "test-" + i);
+ }
+ done.countDown();
+ return null;
+ };
+ futs.add(executor.submit(worker));
+ }
+
+ // launch concurrent readers that call `toString` and serializes the
options, making sure they
+ // don't fail.
+ for (int i = 0; i < 10; i++) {
+ Callable<Void> worker =
+ () -> {
+ while (true) {
+ if (done.await(0, TimeUnit.MILLISECONDS)) {
Review Comment:
```suggestion
if (done.getCount() == 0) {
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java:
##########
@@ -1246,4 +1263,116 @@ public void testGetOptionNameFromMethod() throws
NoSuchMethodException {
handler.as(BaseOptions.class);
assertEquals("foo",
handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
}
+
+ private DynamicType.Unloaded<? extends PipelineOptions> spinNewInterface(int
methodNumber) {
+ return new ByteBuddy()
+ .makeInterface(PipelineOptions.class)
+ .defineMethod("getDynamicMethod" + methodNumber, String.class,
Visibility.PUBLIC)
+ .withoutCode()
+ .defineMethod("setDynamicMethod" + methodNumber, Void.TYPE,
Visibility.PUBLIC)
+ .withParameters(String.class)
+ .withoutCode()
+ .make();
+ }
+
+ private Map<Integer, Class<? extends PipelineOptions>> loadAllInterfaces(
+ int numInterfaces, ClassLoader classLoader) {
+ List<DynamicType.Unloaded<? extends PipelineOptions>> dynamicInterfaces =
+ IntStream.range(0, numInterfaces)
+ .mapToObj(this::spinNewInterface)
+ .collect(Collectors.toList());
+
+ DynamicType.Loaded<Object> root =
+ new
ByteBuddy().subclass(Object.class).make().include(dynamicInterfaces).load(classLoader);
+
+ Map<TypeDescription, Class<?>> loadedInterfaces =
root.getLoadedAuxiliaryTypes();
+ Map<Integer, Class<? extends PipelineOptions>> result = Maps.newHashMap();
+
+ IntStream.range(0, numInterfaces)
+ .forEach(
+ i -> {
+ DynamicType.Unloaded<? extends PipelineOptions> iface =
dynamicInterfaces.get(i);
+ Class<?> clazz =
loadedInterfaces.get(iface.getTypeDescription());
+ result.put(i, (Class<? extends PipelineOptions>) clazz);
+ });
+
+ return result;
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int numInterfaces = 100;
+ int numWorkers = 10;
+ int numReaders = 10;
+ int step = numInterfaces / numWorkers;
+
+ ProxyInvocationHandler handler = new
ProxyInvocationHandler(Maps.newHashMap());
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Map<Integer, Class<? extends PipelineOptions>> ifaces =
loadAllInterfaces(numInterfaces, cl);
+ CountDownLatch startWaiter = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(numWorkers);
+
+ ExecutorService executor = Executors.newFixedThreadPool(numWorkers +
numReaders);
+ List<Future<?>> futs = Lists.newArrayList();
+
+ // launch a `numWorkers` concurrent "writers" that will try to cast the
proxy handler to various
Review Comment:
```suggestion
// launch `numWorkers` concurrent "writers" that will try to cast the
proxy handler to various
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java:
##########
@@ -1246,4 +1263,112 @@ public void testGetOptionNameFromMethod() throws
NoSuchMethodException {
handler.as(BaseOptions.class);
assertEquals("foo",
handler.getOptionName(BaseOptions.class.getMethod("getFoo")));
}
+
+ private DynamicType.Unloaded<? extends PipelineOptions> spinNewInterface(int
methodNumber) {
+ return new ByteBuddy()
+ .makeInterface(PipelineOptions.class)
+ .defineMethod("getDynamicMethod" + methodNumber, String.class,
Visibility.PUBLIC)
+ .withoutCode()
+ .defineMethod("setDynamicMethod" + methodNumber, Void.TYPE,
Visibility.PUBLIC)
+ .withParameters(String.class)
+ .withoutCode()
+ .make();
+ }
+
+ private Map<Integer, Class<? extends PipelineOptions>> loadAllInterfaces(
+ int numInterfaces, ClassLoader classLoader) {
+ List<DynamicType.Unloaded<? extends PipelineOptions>> dynamicInterfaces =
+ IntStream.range(0, numInterfaces)
+ .mapToObj(this::spinNewInterface)
+ .collect(Collectors.toList());
+
+ DynamicType.Loaded<Object> root =
+ new
ByteBuddy().subclass(Object.class).make().include(dynamicInterfaces).load(classLoader);
+
+ Map<TypeDescription, Class<?>> loadedInterfaces =
root.getLoadedAuxiliaryTypes();
+ Map<Integer, Class<? extends PipelineOptions>> result = Maps.newHashMap();
+
+ IntStream.range(0, numInterfaces)
+ .forEach(
+ i -> {
+ DynamicType.Unloaded<? extends PipelineOptions> iface =
dynamicInterfaces.get(i);
+ Class<?> clazz =
loadedInterfaces.get(iface.getTypeDescription());
+ result.put(i, (Class<? extends PipelineOptions>) clazz);
+ });
+
+ return result;
+ }
+
+ @Test
+ public void testConcurrency() throws Exception {
+ int numInterfaces = 100;
+ int numWorkers = 10;
+ int numReaders = 10;
+ int step = numInterfaces / numWorkers;
+
+ ProxyInvocationHandler handler = new
ProxyInvocationHandler(Maps.newHashMap());
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Map<Integer, Class<? extends PipelineOptions>> ifaces =
loadAllInterfaces(numInterfaces, cl);
+ CountDownLatch startWaiter = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(numWorkers);
+
+ ExecutorService executor = Executors.newFixedThreadPool(numWorkers +
numReaders);
+ List<Future<?>> futs = Lists.newArrayList();
+
+ // launch a `numWorkers` concurrent "writers" that will try to cast the
proxy handler to various
+ // interfaces and set a value on them.
+ for (int start = 0; start < numInterfaces; start += step) {
+ final int s = start;
+ final int end = start + step;
+ Callable<Void> worker =
+ () -> {
+ startWaiter.await();
+ for (int i = s; i < end; i++) {
+ Class<? extends PipelineOptions> iface = ifaces.get(i);
+ Method setter = iface.getDeclaredMethod("setDynamicMethod" + i,
String.class);
+ PipelineOptions opt1 = handler.as(iface);
+ setter.invoke(opt1, "test-" + i);
+ }
+ done.countDown();
+ return null;
+ };
+ futs.add(executor.submit(worker));
+ }
+
+ // launch concurrent readers that call `toString` and serializes the
options, making sure they
+ // don't fail.
+ for (int i = 0; i < 10; i++) {
+ Callable<Void> worker =
+ () -> {
+ while (true) {
+ if (done.await(0, TimeUnit.MILLISECONDS)) {
+ return null;
+ }
+ assertNotNull(handler.toString());
+ PipelineOptionsFactory.MAPPER.writeValue(
+ ByteStreams.nullOutputStream(),
handler.as(PipelineOptions.class));
+ }
+ };
+ futs.add(executor.submit(worker));
+ }
+
+ // wait for everything to finish
+ startWaiter.countDown();
+ done.await();
Review Comment:
```suggestion
```
Blocking on the futures makes this redundant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]