[
https://issues.apache.org/jira/browse/HUDI-625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041691#comment-17041691
]
Vinoth Chandar commented on HUDI-625:
-------------------------------------
Wondering if its sufficient to just have the payload/data part of the incoming
HoodieRecord in the ExternalSpillableMap.. the other fields like partitionpath,
record location etc are just the same for all records coming in... so we can
just add it lazily on demand? This will cut down the size by a lot and make
deserialization simple..
> Address performance concerns on DiskBasedMap.get() during upsert of thin
> records
> --------------------------------------------------------------------------------
>
> Key: HUDI-625
> URL: https://issues.apache.org/jira/browse/HUDI-625
> Project: Apache Hudi (incubating)
> Issue Type: Improvement
> Components: Performance, Writer Core
> Reporter: Vinoth Chandar
> Assignee: Vinoth Chandar
> Priority: Major
> Fix For: 0.6.0
>
> Attachments: image-2020-02-20-23-34-24-155.png,
> image-2020-02-20-23-34-27-466.png, image-2020-02-21-15-35-56-637.png
>
>
> [https://github.com/apache/incubator-hudi/issues/1328]
>
> So what's going on here is that each entry (single data field) is estimated
> to be around 500-750 bytes in memory and things spill a lot...
> {code:java}
> 20/02/20 23:00:39 INFO ExternalSpillableMap: Estimated Payload size => 760
> for 3675605,HoodieRecord{key=HoodieKey { recordKey=3675605
> partitionPath=default}, currentLocation='HoodieRecordLocation
> {instantTime=20200220225748, fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}',
> newLocation='HoodieRecordLocation {instantTime=20200220225921,
> fileId=499f8d2c-df6a-4275-9166-3de4ac91f3bf-0}'} {code}
>
> h2. Reproduce steps
>
> {code:java}
> export SPARK_HOME=/home/dockeradmin/hudi/spark-2.4.4-bin-hadoop2.7
> ${SPARK_HOME}/bin/spark-shell \
> --executor-memory 6G \
> --packages
> org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
> \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
> {code}
>
> {code:java}
> val HUDI_FORMAT = "org.apache.hudi"
> val TABLE_NAME = "hoodie.table.name"
> val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
> val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
> val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
> val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
> val UPSERT_OPERATION_OPT_VAL = "upsert"
> val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
> val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
> val config = Map(
> "table_name" -> "example_table",
> "target" -> "file:///tmp/example_table/",
> "primary_key" -> "id",
> "sort_key" -> "id"
> )
> val readPath = config("target") + "/*"val json_data = (1 to 4000000).map(i =>
> "{\"id\":" + i + "}")
> val jsonRDD = spark.sparkContext.parallelize(json_data, 2)
> val df1 = spark.read.json(jsonRDD)
> println(s"${df1.count()} records in source 1")
> df1.write.format(HUDI_FORMAT).
> option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
> option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
> option(TABLE_NAME, config("table_name")).
> option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
> option(BULK_INSERT_PARALLELISM, 1).
> mode("Overwrite").
>
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
> records in Hudi table")
> // Runs very slow
> df1.limit(3000000).write.format(HUDI_FORMAT).
> option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
> option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
> option(TABLE_NAME, config("table_name")).
> option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
> option(UPSERT_PARALLELISM, 20).
> mode("Append").
> save(config("target"))
> // Runs very slow
> df1.write.format(HUDI_FORMAT).
> option(PRECOMBINE_FIELD_OPT_KEY, config("sort_key")).
> option(RECORDKEY_FIELD_OPT_KEY, config("primary_key")).
> option(TABLE_NAME, config("table_name")).
> option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).
> option(UPSERT_PARALLELISM, 20).
> mode("Append").
>
> save(config("target"))println(s"${spark.read.format(HUDI_FORMAT).load(readPath).count()}
> records in Hudi table")
> {code}
>
>
>
> h2. *Analysis*
> h3. *Upsert (4000000 entries)*
> {code:java}
> WARN HoodieMergeHandle:
> Number of entries in MemoryBasedMap => 150875
> Total size in bytes of MemoryBasedMap => 83886580
> Number of entries in DiskBasedMap => 3849125
> Size of file spilled to disk => 1443046132
> {code}
> h3. Hang stackstrace (DiskBasedMap#get)
>
> {code:java}
> "pool-21-thread-2" Id=696 cpuUsage=98% RUNNABLE
> at java.util.zip.ZipFile.getEntry(Native Method)
> at java.util.zip.ZipFile.getEntry(ZipFile.java:310)
> - locked java.util.jar.JarFile@1fc27ed4
> at java.util.jar.JarFile.getEntry(JarFile.java:240)
> at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
> at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
> at sun.misc.URLClassPath.getResource(URLClassPath.java:212)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:365)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> - locked java.lang.Object@28f65251
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked
> scala.reflect.internal.util.ScalaClassLoader$URLClassLoader@a353dff
> at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> - locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
> at
> com.esotericsoftware.reflectasm.AccessClassLoader.loadClass(AccessClassLoader.java:92)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> com.esotericsoftware.reflectasm.ConstructorAccess.get(ConstructorAccess.java:59)
> - locked com.esotericsoftware.reflectasm.AccessClassLoader@2c7122e2
> at
> org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase.lambda$newInstantiator$0(SerializationUtils.java:151)
> at
> org.apache.hudi.common.util.SerializationUtils$KryoInstantiator$KryoBase$$Lambda$265/1458915834.newInstance(Unknown
> Source)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1139)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:562)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:538)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at
> org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:112)
> at
> org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:86)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:217)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:211)
> at
> org.apache.hudi.common.util.collection.DiskBasedMap.get(DiskBasedMap.java:207)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:173)
> at
> org.apache.hudi.common.util.collection.ExternalSpillableMap.get(ExternalSpillableMap.java:55)
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:280)
> at
> org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:434)
> at
> org.apache.hudi.table.HoodieCopyOnWriteTable$UpdateHandler.consumeOneRecord(HoodieCopyOnWriteTable.java:424)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
> at
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor$$Lambda$76/1412692041.call(Unknown
> Source)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h3. Average time of {{DiskBasedMap#get}}
>
> {code:java}
> $ monitor *DiskBasedMap get -c 12
> Affect(class-cnt:1 , method-cnt:4) cost in 221 ms.
> timestamp class method total success fail avg-rt(ms)
> fail-rate
> ----------------------------------------------------------------------------------------
> 2020-02-20 18:13:36 DiskBasedMap get 5814 5814 0 6.12
> 0.00%
> timestamp class method total success fail avg-rt(ms)
> fail-rate
> ----------------------------------------------------------------------------------------
> 2020-02-20 18:13:48 DiskBasedMap get 9117 9117 0 3.89
> 0.00%
> timestamp class method total success fail avg-rt(ms)
> fail-rate
> ----------------------------------------------------------------------------------------
> 2020-02-20 18:14:16 DiskBasedMap get 8490 8490 0 4.10
> 0.00%
> {code}
>
> h3. Call time strace:
> {code:java}
> thread-2;id=194;is_daemon=false;priority=5;TCCL=org.apache.spark.repl.ExecutorClassLoader@7a47bc29
> `---[4.361707ms] org.apache.hudi.common.util.collection.DiskBasedMap:get()
> +---[0.001704ms] java.util.Map:get()
> `---[4.344261ms]
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
> `---[4.328981ms]
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
> +---[0.00122ms]
> org.apache.hudi.common.util.collection.DiskBasedMap:getRandomAccessFile()
> `---[4.313586ms]
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
> `---[4.283509ms]
> org.apache.hudi.common.util.collection.DiskBasedMap:get()
> +---[0.001169ms]
> org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getOffsetOfValue()
> +---[7.1E-4ms] java.lang.Long:longValue()
> +---[6.97E-4ms]
> org.apache.hudi.common.util.collection.DiskBasedMap$ValueMetadata:getSizeOfValue()
> +---[0.036483ms]
> org.apache.hudi.common.util.SpillableMapUtils:readBytesFromDisk()
> `---[4.201996ms]
> org.apache.hudi.common.util.SerializationUtils:deserialize(){code}
> h3. Kryo deserialize performance test
>
> {code:java}
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericRecord;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.Random;
> /**
> * Test serialization.
> */
> public class TestSerializationUtils {
> public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\","
> + "\"name\": \"triprec\"," + "\"fields\": [ "
> + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\":
> \"_row_key\", \"type\": \"string\"},"
> + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\":
> \"driver\", \"type\": \"string\"},"
> + "{\"name\": \"begin_lat\", \"type\": \"double\"}," +
> "{\"name\": \"begin_lon\", \"type\": \"double\"},"
> + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\":
> \"end_lon\", \"type\": \"double\"},"
> + "{\"name\": \"fare\",\"type\": {\"type\":\"record\",
> \"name\":\"fare\",\"fields\": ["
> + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\":
> \"currency\", \"type\": \"string\"}]}},"
> + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\",
> \"default\": false} ]}";
> public static final Schema AVRO_SCHEMA = new
> Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
> public static GenericRecord generateGenericRecord() {
> Random RAND = new Random(46474747);
> GenericRecord rec = new GenericData.Record(AVRO_SCHEMA);
> rec.put("_row_key", "rowKey");
> rec.put("timestamp", "timestamp");
> rec.put("rider", "riderName");
> rec.put("driver", "driverName");
> rec.put("begin_lat", RAND.nextDouble());
> rec.put("begin_lon", RAND.nextDouble());
> rec.put("end_lat", RAND.nextDouble());
> rec.put("end_lon", RAND.nextDouble());
> rec.put("_hoodie_is_deleted", false);
> return rec;
> }
> public static void main(String[] args) throws Exception {
> GenericRecord genericRecord = generateGenericRecord();
> byte[] serializedObject = SerializationUtils.serialize(genericRecord);
> List<Object> datas = new LinkedList<>();
> long t1 = System.currentTimeMillis();
> for (int i = 0; i < 1000; i++) {
>
> datas.add(SerializationUtils.<GenericRecord>deserialize(serializedObject));
> }
> long t2 = System.currentTimeMillis();
> System.out.println("dese times: " + datas.size());
> System.out.println("dese cost: " + (t2 - t1) + "ms");
> }
> }{code}
>
> !image-2020-02-21-15-35-56-637.png|width=404,height=165!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)