[ 
https://issues.apache.org/jira/browse/BEAM-5037?focusedWorklogId=128230&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-128230
 ]

ASF GitHub Bot logged work on BEAM-5037:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jul/18 17:53
            Start Date: 27/Jul/18 17:53
    Worklog Time Spent: 10m 
      Work Description: lgajowy closed pull request #6083: [BEAM-5037] fix 
synthetic options
URL: https://github.com/apache/beam/pull/6083
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
index d9f652ace9d..71b146d43fc 100644
--- 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
@@ -186,7 +186,7 @@ public SyntheticSourceReader createReader(PipelineOptions 
pipelineOptions) {
       for (int i = 0; i < relativeSizes.length; ++i) {
         relativeSizes[i] =
             sourceOptions.bundleSizeDistribution.sample(
-                sourceOptions.hashFunction.hashInt(i).asLong());
+                sourceOptions.hashFunction().hashInt(i).asLong());
       }
 
       // Generate offset ranges proportional to the relative sizes.
@@ -307,7 +307,7 @@ public Record genRecord(long position) {
       // close to each other. To make seeds fed into the Random objects 
unrelated,
       // we use a hashing function to map the position to its corresponding 
hashcode,
       // and use the hashcode as a seed to feed into the Random object.
-      long hashCodeOfPosition = hashFunction.hashLong(position).asLong();
+      long hashCodeOfPosition = hashFunction().hashLong(position).asLong();
       return new Record(genKvPair(hashCodeOfPosition), 
nextDelay(hashCodeOfPosition));
     }
 
@@ -402,7 +402,7 @@ protected boolean advanceImpl() {
       currentKvPair = record.kv;
       // TODO: add a separate distribution for the sleep time of reading the 
first record
       // (e.g.,"open" the files).
-      long hashCodeOfVal = 
options.hashFunction.hashBytes(currentKvPair.getValue()).asLong();
+      long hashCodeOfVal = 
options.hashFunction().hashBytes(currentKvPair.getValue()).asLong();
       Random random = new Random(hashCodeOfVal);
       SyntheticUtils.delay(
           record.sleepMsec, options.cpuUtilizationInMixedDelay, 
options.delayType, random);
diff --git 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
index 9e91c67c683..7cb9b94a38d 100644
--- 
a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
+++ 
b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticOptions.java
@@ -61,9 +61,6 @@
     MIXED,
   }
 
-  /** Mapper for (de)serializing JSON. */
-  private static final ObjectMapper MAPPER = new ObjectMapper();
-
   /**
    * Wrapper over a distribution. Unfortunately commons-math does not provide 
a common interface
    * over both RealDistribution and IntegerDistribution, and we sometimes need 
one and sometimes the
@@ -160,7 +157,7 @@ public Object getDistribution() {
    * The hash function is used to generate seeds that are fed into the random 
number generators and
    * the sleep time distributions.
    */
-  @JsonIgnore public transient HashFunction hashFunction;
+  @JsonIgnore private transient HashFunction hashFunction;
 
   /**
    * SyntheticOptions supports several delay distributions including uniform, 
normal, exponential,
@@ -217,7 +214,15 @@ public Object getDistribution() {
   @JsonDeserialize
   public void setSeed(int seed) {
     this.seed = seed;
-    this.hashFunction = Hashing.murmur3_128(seed);
+  }
+
+  public HashFunction hashFunction() {
+    // due to field's transiency initialize when null.
+    if (hashFunction == null) {
+      this.hashFunction = Hashing.murmur3_128(seed);
+    }
+
+    return hashFunction;
   }
 
   static class SamplerDeserializer extends JsonDeserializer<Sampler> {
@@ -298,7 +303,6 @@ public void validate() {
         hotKeyFraction >= 0,
         "hotKeyFraction should be a non-negative number, but found %s",
         hotKeyFraction);
-    checkArgument(hashFunction != null, "hashFunction hasn't been 
initialized.");
     if (hotKeyFraction > 0) {
       int intBytes = Integer.SIZE / 8;
       checkArgument(
@@ -314,7 +318,7 @@ public void validate() {
   @Override
   public String toString() {
     try {
-      return MAPPER.writeValueAsString(this);
+      return new ObjectMapper().writeValueAsString(this);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 128230)
    Time Spent: 1h  (was: 50m)

> HashFunction is not intialized in SyntheticOptions
> --------------------------------------------------
>
>                 Key: BEAM-5037
>                 URL: https://issues.apache.org/jira/browse/BEAM-5037
>             Project: Beam
>          Issue Type: Bug
>          Components: testing
>            Reporter: Lukasz Gajowy
>            Assignee: Lukasz Gajowy
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> This is due to fact that the field is transient hence not getting serialized 
> and then initialized again after deserialization. We need some other way of 
> initializing it, immune to the field's transiency.
> Stacktrace:
> {code:java}
> Class org.apache.beam.sdk.io.synthetic.GroupByKeyLoadIT
> all > org.apache.beam.sdk.io.synthetic > GroupByKeyLoadIT
> 1
> tests
> 1
> failures
> 0
> ignored
> 0.050s
> duration
> 0%
> successful
> Failed tests
> Tests
> Standard error
> groupByKeyLoadTest
> java.lang.IllegalArgumentException: hashFunction hasn't been initialized.
>       at 
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>       at 
> org.apache.beam.sdk.io.synthetic.SyntheticOptions.validate(SyntheticOptions.java:301)
>       at 
> org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO$SyntheticSourceOptions.validate(SyntheticBoundedIO.java:285)
>       at 
> org.apache.beam.sdk.io.synthetic.SyntheticBoundedIO$SyntheticBoundedSource.validate(SyntheticBoundedIO.java:119)
>       at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:95)
>       at org.apache.beam.sdk.io.Read$Bounded.expand(Read.java:85)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
>       at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>       at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:167)
>       at 
> org.apache.beam.sdk.io.synthetic.GroupByKeyLoadIT.groupByKeyLoadTest(GroupByKeyLoadIT.java:81)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>       at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>       at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>       at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>       at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>       at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
>       at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>       at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
>       at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>       at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>       at 
> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at 
> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to