Hello!

I've been running some tests to view the overhead of using beam over just using the underlying runner. For one of the tests, I'm reading data from Kafka and I see by profiling that a lot of time is spent in the reflection part of the KafakIO, more specifically the ConsumerSpEL class.

I'm not so familiar with the KafkaIO connector, but I was hoping someone here might know how to diagnoise this further :)

/ Teodor Spæren
Title: Hot Spots

Hot Spots

Session: Remote attach
Time of export: Saturday, February 13, 2021 3:51:32 PM CET
JVM time: 06:24
  
Thread selection:   KafkaIO.Read/KafkaIO.Read/Read(KafkaUnboundedSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.ProcessElements -> KafkaIO.Read/KafkaIO.Read/Read(KafkaUnboundedSource)/ParDo(StripIds)/ParMultiDo(StripIds) -> KafkaIO.Read/Remove Kafka Metadata/ParMultiDo(Anonymous) -> Values/Values/Map/ParMultiDo(Anonymous) -> KafkaIO.KafkaValueWrite/Kafka values with default key/Map/ParMultiDo(Anonymous) -> KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) (1/1)#0 [Flink Task Threads]
Thread status:   Runnable
Aggregation level:  Methods
Hot spot calculation:  Self times
Unprofiled classes:  Show separately


 Hot SpotSelf TimeAverage TimeInvocations
java.lang.reflect.Executable.sharedGetParameterAnnotations 46,473 ms (18 %)n/an/a
  18.1% - 46,473 ms java.lang.reflect.Method.getParameterAnnotations
  18.1% - 46,473 ms org.springframework.core.MethodParameter.getParameterAnnotations
  18.1% - 46,473 ms org.springframework.core.convert.TypeDescriptor.<init>
  11.1% - 28,542 ms org.springframework._expression_.spel.support.ReflectiveMethodResolver.resolve
  7.0% - 17,931 ms org.springframework._expression_.spel.support.ReflectionHelper.convertArguments
java.lang.Class.forName0 38,524 ms (15 %)n/an/a
  15.0% - 38,524 ms java.lang.Class.forName
  15.0% - 38,524 ms sun.reflect.generics.factory.CoreReflectionFactory.makeNamedType
  15.0% - 38,524 ms sun.reflect.generics.visitor.Reifier.visitClassTypeSignature
  15.0% - 38,524 ms sun.reflect.generics.tree.ClassTypeSignature.accept
  15.0% - 38,524 ms sun.reflect.generics.repository.ConstructorRepository.computeParameterTypes
  15.0% - 38,524 ms sun.reflect.generics.repository.ConstructorRepository.getParameterTypes
  15.0% - 38,524 ms java.lang.reflect.Executable.getGenericParameterTypes
  15.0% - 38,524 ms java.lang.reflect.Method.getGenericParameterTypes
  15.0% - 38,524 ms org.springframework.core.MethodParameter.getGenericParameterType
  15.0% - 38,524 ms org.springframework.core.SerializableTypeWrapper$MethodParameterTypeProvider.getType
  15.0% - 38,524 ms org.springframework.core.SerializableTypeWrapper.forTypeProvider
  15.0% - 38,524 ms org.springframework.core.ResolvableType.forType
  15.0% - 38,524 ms org.springframework.core.ResolvableType.forMethodParameter
  15.0% - 38,524 ms org.springframework.core.ResolvableType.forMethodParameter
  15.0% - 38,524 ms org.springframework.core.convert.TypeDescriptor.<init>
  15.0% - 38,524 ms org.springframework._expression_.spel.support.ReflectiveMethodResolver.resolve
  15.0% - 38,524 ms org.springframework._expression_.spel.ast.MethodReference.findAccessorForMethod
  15.0% - 38,524 ms org.springframework._expression_.spel.ast.MethodReference.getValueInternal
  15.0% - 38,524 ms org.springframework._expression_.spel.ast.MethodReference$MethodValueRef.getValue
  15.0% - 38,524 ms org.springframework._expression_.spel.ast.CompoundExpression.getValueInternal
  15.0% - 38,524 ms org.springframework._expression_.spel.ast.SpelNodeImpl.getValue
  15.0% - 38,524 ms org.springframework._expression_.spel.standard.SpelExpression.getValue
  15.0% - 38,524 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders
  7.7% - 19,856 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeKey
  7.3% - 18,668 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue
org.springframework._expression_.spel.support.ReflectiveMethodResolver.resolve 12,990 ms (5 %)n/an/a
  5.1% - 12,990 ms org.springframework._expression_.spel.ast.MethodReference.findAccessorForMethod
  5.1% - 12,990 ms org.springframework._expression_.spel.ast.MethodReference.getValueInternal
  5.1% - 12,990 ms org.springframework._expression_.spel.ast.MethodReference$MethodValueRef.getValue
  5.1% - 12,990 ms org.springframework._expression_.spel.ast.CompoundExpression.getValueInternal
  5.1% - 12,990 ms org.springframework._expression_.spel.ast.SpelNodeImpl.getValue
  5.1% - 12,990 ms org.springframework._expression_.spel.standard.SpelExpression.getValue
  5.1% - 12,990 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders
  2.6% - 6,643 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeKey
  2.5% - 6,347 ms org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue
java.lang.StringLatin1.replace 10,404 ms (4 %)n/an/a
org.springframework.core.ResolvableType.forMethodParameter 10,073 ms (3 %)n/an/a
java.util.HashMap.putVal 9,663 ms (3 %)n/an/a
java.util.TimSort.binarySort 8,920 ms (3 %)n/an/a
java.lang.Class.copyMethods 7,974 ms (3 %)n/an/a
sun.reflect.generics.parser.SignatureParser.skipIdentifier 6,972 ms (2 %)n/an/a
org.springframework.util.ConcurrentReferenceHashMap$Segment.getReference 5,387 ms (2 %)n/an/a
org.springframework.core.convert.support.GenericConversionService$ConverterCacheKey.equals 4,959 ms (1 %)n/an/a
org.springframework.core.ResolvableType.forType(java.lang.reflect.Type) 4,708 ms (1 %)n/an/a
sun.nio.ch.IOUtil.write1 4,029 ms (1 %)n/an/a
org.springframework.util.ReflectionUtils.doWithMethods(java.lang.Class, org.springframework.util.ReflectionUtils$MethodCallback, org.springframework.util.ReflectionUtils$MethodFilter) 3,858 ms (1 %)n/an/a
org.springframework._expression_.spel.support.ReflectionHelper.compareArguments 3,783 ms (1 %)n/an/a
org.apache.beam.sdk.transforms.Contextful.lambda$fn$36334a93$1 3,634 ms (1 %)n/an/a
org.springframework.core.ResolvableType.forType(java.lang.reflect.Type, org.springframework.core.ResolvableType$VariableResolver) 3,554 ms (1 %)n/an/a
java.util.HashMap.hash 3,422 ms (1 %)n/an/a
org.springframework._expression_.spel.ast.MethodReference.getValueRef 3,245 ms (1 %)n/an/a
java.util.AbstractCollection.addAll 3,058 ms (1 %)n/an/a
java.util.HashMap.put 2,757 ms (1 %)n/an/a
org.springframework._expression_.spel.ast.MethodReference.getValueInternal 2,492 ms (0 %)n/an/a
org.springframework.core.BridgeMethodResolver.findBridgedMethod 2,473 ms (0 %)n/an/a
sun.reflect.generics.factory.CoreReflectionFactory.findTypeVariable 2,307 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseZeroOrMoreTypeSignatures 2,274 ms (0 %)n/an/a
org.apache.kafka.clients.producer.internals.RecordAccumulator.append 2,119 ms (0 %)n/an/a
org.springframework._expression_.spel.support.ReflectionHelper.getTypeDifferenceWeight 1,831 ms (0 %)n/an/a
java.lang.PublicMethods$MethodList.filter 1,762 ms (0 %)n/an/a
java.util.HashMap.getNode 1,666 ms (0 %)n/an/a
org.springframework.util.ConcurrentReferenceHashMap.purgeUnreferencedEntries 1,601 ms (0 %)n/an/a
java.util.ArrayList.<init> 1,501 ms (0 %)n/an/a
org.apache.kafka.common.utils.ByteUtils.sizeOfUnsignedVarint 1,351 ms (0 %)n/an/a
org.springframework.util.ReflectionUtils.getAllDeclaredMethods 1,346 ms (0 %)n/an/a
java.util.Objects.equals 1,284 ms (0 %)n/an/a
org.springframework.util.ObjectUtils.nullSafeEquals 1,217 ms (0 %)n/an/a
org.springframework._expression_.spel.support.ReflectiveMethodExecutor.execute 1,197 ms (0 %)n/an/a
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance 1,118 ms (0 %)n/an/a
org.springframework.core.convert.support.ConversionUtils.invokeConverter 1,109 ms (0 %)n/an/a
org.springframework.util.ConcurrentReferenceHashMap.getHash 1,092 ms (0 %)n/an/a
org.apache.kafka.clients.producer.KafkaProducer.doSend 1,058 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature 950 ms (0 %)n/an/a
org.apache.kafka.clients.producer.ProducerRecord.<init> 889 ms (0 %)n/an/a
org.apache.kafka.common.serialization.StringDeserializer.deserialize 889 ms (0 %)n/an/a
java.lang.StringBuilder.<init> 879 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.<init> 872 ms (0 %)n/an/a
org.springframework._expression_.spel.ast.MethodReference.updateExitTypeDescriptor 825 ms (0 %)n/an/a
org.springframework._expression_.spel.support.StandardEvaluationContext.getMethodResolvers 756 ms (0 %)n/an/a
sun.reflect.generics.visitor.Reifier.visitArrayTypeSignature 723 ms (0 %)n/an/a
java.io.DataOutputStream.writeByte 691 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseReturnType 690 ms (0 %)n/an/a
org.apache.beam.sdk.io.kafka.TimestampPolicyFactory$ProcessingTimePolicy.getTimestampForRecord 648 ms (0 %)n/an/a
java.lang.StringBuilder.append 565 ms (0 %)n/an/a
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue 533 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature 526 ms (0 %)n/an/a
java.lang.reflect.Method.invoke 515 ms (0 %)n/an/a
java.util.TimSort.countRunAndMakeAscending 507 ms (0 %)n/an/a
org.springframework.core.SerializableTypeWrapper$TypeProxyInvocationHandler.invoke 502 ms (0 %)n/an/a
jdk.internal.misc.Unsafe.unpark 491 ms (0 %)n/an/a
org.springframework.util.ConcurrentReferenceHashMap.getReference 470 ms (0 %)n/an/a
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders 457 ms (0 %)n/an/a
org.springframework._expression_.spel.CodeFlow.toDescriptorFromObject 440 ms (0 %)n/an/a
java.io.DataOutputStream.write 433 ms (0 %)n/an/a
org.springframework.util.ObjectUtils.nullSafeHashCode 423 ms (0 %)n/an/a
org.springframework._expression_.spel.support.StandardEvaluationContext.initMethodResolvers 407 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseTypeSignature 406 ms (0 %)n/an/a
org.springframework.core.ResolvableType.equals 394 ms (0 %)n/an/a
java.lang.reflect.Method.isVarArgs 385 ms (0 %)n/an/a
java.util.Collections$UnmodifiableCollection$1.<init> 376 ms (0 %)n/an/a
org.springframework.core.convert.TypeDescriptor.forObject 362 ms (0 %)n/an/a
org.springframework._expression_.spel.ast.MethodReference$MethodValueRef.getValue 356 ms (0 %)n/an/a
org.apache.kafka.common.serialization.StringSerializer.serialize 351 ms (0 %)n/an/a
java.lang.reflect.Executable.equalParamTypes 322 ms (0 %)n/an/a
java.lang.StringBuilder.toString 309 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseArrayTypeSignature 301 ms (0 %)n/an/a
java.lang.Object.hashCode 290 ms (0 %)n/an/a
java.io.ObjectOutputStream$BlockDataOutputStream.getUTFLength 279 ms (0 %)n/an/a
java.util.Arrays.hashCode 275 ms (0 %)n/an/a
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire 274 ms (0 %)n/an/a
sun.reflect.generics.tree.ClassTypeSignature.accept 251 ms (0 %)n/an/a
org.springframework._expression_.spel.standard.SpelExpression.getValue 234 ms (0 %)n/an/a
org.apache.kafka.common.record.CompressionRatioEstimator.estimation 224 ms (0 %)n/an/a
java.lang.Throwable.fillInStackTrace 221 ms (0 %)n/an/a
java.io.ObjectOutputStream$BlockDataOutputStream.writeBytes 219 ms (0 %)n/an/a
org.springframework._expression_.spel.ast.CompoundExpression.getValueRef 203 ms (0 %)n/an/a
org.springframework._expression_.spel.ast.MethodReference.getCachedExecutor 202 ms (0 %)n/an/a
sun.reflect.generics.parser.SignatureParser.parseMethodSig 193 ms (0 %)n/an/a
org.apache.kafka.common.record.DefaultRecord.writeTo 185 ms (0 %)n/an/a
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement 177 ms (0 %)n/an/a
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode 174 ms (0 %)n/an/a
org.springframework.util.ConcurrentReferenceHashMap$Segment.findInChain 170 ms (0 %)n/an/a
org.apache.kafka.common.record.MemoryRecordsBuilder.<init> 164 ms (0 %)n/an/a
java.io.ObjectOutputStream$HandleTable.lookup 154 ms (0 %)n/an/a
java.io.ObjectOutputStream.writeObject0 129 ms (0 %)n/an/a
org.springframework.core.convert.support.ArrayToArrayConverter.convert 124 ms (0 %)n/an/a
org.apache.kafka.clients.producer.KafkaProducer.send 124 ms (0 %)n/an/a
org.springframework.util.ReflectionUtils.doWithMethods(java.lang.Class, org.springframework.util.ReflectionUtils$MethodCallback) 123 ms (0 %)n/an/a
java.util.concurrent.ConcurrentHashMap.replaceNode 110 ms (0 %)n/an/a
java.lang.reflect.Method.copy 106 ms (0 %)n/an/a
org.springframework._expression_.spel.support.ReflectiveMethodResolver.getMethods 105 ms (0 %)n/an/a
java.util.concurrent.locks.LockSupport.park 96,238 µs (0 %)n/an/a

Reply via email to