[
https://issues.apache.org/jira/browse/BEAM-891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650533#comment-15650533
]
Stas Levin edited comment on BEAM-891 at 11/9/16 12:28 PM:
-----------------------------------------------------------
We can indeed try syncing the following block in {{SparkRuntimeContext}}:
{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}
But since other calls to {{MetricsSystem#removeSource}} and
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this
kind of synchronisation will only help if the so called "race" lies in our Beam
code. If the race is with Spark's internal calls, us syncing the above block
will not be very helpful.
Another option would be to replace the above block with this one (which
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).empty()) {
metricsSystem.registerSource(aggregatorMetricSource);
}
{code}
Which eliminates {{SparkRuntimeContext}}'s call to
{{MetricsSystem#removeSource}} altogether.
was (Author: staslev):
We can indeed try syncing the following block in {{SparkRuntimeContext}}:
{code:java}
metricsSystem.removeSource(aggregatorMetricSource);
metricsSystem.registerSource(aggregatorMetricSource);
{code}
But since other calls to {{MetricsSystem#removeSource}} and
{{MetricsSystem#registerSource}} are not synced (e.g., from Spark itself), this
kind of synchronisation will only help if the so called "race" lies in our Beam
code. If the race is with Spark's internal calls, us syncing the above block
will not be very helpful.
Another option would be to replace the above block with this one (which
requires some minor changes to {{AggregatorMetricSource}}):
{code:java}
if(metricsSystem.getSourcesByName(AggregatorMetricSource.NAME).nonEmpty()) {
metricsSystem.registerSource(aggregatorMetricSource);
}
{code}
Which eliminates {{SparkRuntimeContext}}'s call to
{{MetricsSystem#removeSource}} altogether.
> Flake in Spark metrics library?
> -------------------------------
>
> Key: BEAM-891
> URL: https://issues.apache.org/jira/browse/BEAM-891
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Daniel Halperin
> Assignee: Stas Levin
>
> [~staslev] I think you implemented this functionality originally? Want to
> take a look? CC [~amitsela]
> Run:
> https://builds.apache.org/job/beam_PostCommit_RunnableOnService_SparkLocal/org.apache.beam$beam-runners-spark/43/testReport/junit/org.apache.beam.sdk.transforms/FilterTest/testFilterGreaterThan/
> Error:
> {code}
> java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: 5
> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:169)
> at
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:77)
> at
> org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:53)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> at
> org.apache.beam.sdk.transforms.FilterTest.testFilterGreaterThan(FilterTest.java:122)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at
> org.apache.maven.surefire.junitcore.pc.Scheduler$1.run(Scheduler.java:393)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: 5
> at
> scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
> at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)
> at
> scala.collection.IndexedSeqOptimized$class.segmentLength(IndexedSeqOptimized.scala:189)
> at
> scala.collection.mutable.ArrayBuffer.segmentLength(ArrayBuffer.scala:47)
> at
> scala.collection.IndexedSeqOptimized$class.indexWhere(IndexedSeqOptimized.scala:198)
> at scala.collection.mutable.ArrayBuffer.indexWhere(ArrayBuffer.scala:47)
> at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:144)
> at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> at scala.collection.GenSeqLike$class.indexOf(GenSeqLike.scala:128)
> at scala.collection.AbstractSeq.indexOf(Seq.scala:40)
> at
> scala.collection.mutable.BufferLike$class.$minus$eq(BufferLike.scala:126)
> at scala.collection.mutable.AbstractBuffer.$minus$eq(Buffer.scala:48)
> at
> org.apache.spark.metrics.MetricsSystem.removeSource(MetricsSystem.scala:159)
> at
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.registerMetrics(SparkRuntimeContext.java:94)
> at
> org.apache.beam.runners.spark.translation.SparkRuntimeContext.<init>(SparkRuntimeContext.java:66)
> at
> org.apache.beam.runners.spark.translation.EvaluationContext.<init>(EvaluationContext.java:73)
> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:146)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)