[ https://issues.apache.org/jira/browse/CARBONDATA-4276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chetan Bhat updated CARBONDATA-4276: ------------------------------------ Description: *With Carbon 2.2.0 Spark 2.4.5 cluster* *steps :* *+In hdfs execute following command :+* cd /opt/HA/C10/install/hadoop/datanode/bin/ ./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data ./hdfs dfs -mkdir -p /tmp/stream_test/\{checkpoint_all_data,bad_records_all_data} ./hdfs dfs -mkdir -p /Priyesh/streaming/csv/ ./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/ ./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv /Priyesh/streaming/csv/100_olap_C21.csv *+From Spark-beeline /Spark-sql /Spark-shell, execute :+* DROP TABLE IF EXISTS all_datatypes_2048; create table all_datatypes_2048 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) stored as carbondata TBLPROPERTIES('table_blocksize'='2048','streaming'='true', 'sort_columns'='imei'); *+From Spark-shell ,execute :+* import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.Trigger.ProcessingTime val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv") df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start show segments for table all_datatypes_2048; *issue 1 :* *+when copy csv file in hdfs folder for 1st time after streaming started ,writestream fails with error:+* scala> df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start 21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid. Using the default value "true" 21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for key carbon.lock.type is invalid for current file system. Use the default value HDFSLOCK instead. 21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist 21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist 21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist 21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist 21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8 scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration. at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454) 21/08/26 13:00:49 ERROR CarbonUtil: Error while closing stream:java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration. *issue 2 :* *+when copy csv file in hdfs folder for 2nd time after streaming started ,writestreaming fails with :+* 21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream segment: hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy17.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 21/08/26 13:01:36 ERROR Utils: Aborting task org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy17.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) was: *steps :* *+In hdfs execute following command :+* cd /opt/HA/C10/install/hadoop/datanode/bin/ ./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data ./hdfs dfs -mkdir -p /tmp/stream_test/\{checkpoint_all_data,bad_records_all_data} ./hdfs dfs -mkdir -p /Priyesh/streaming/csv/ ./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/ ./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv /Priyesh/streaming/csv/100_olap_C21.csv *+From Spark-beeline /Spark-sql /Spark-shell, execute :+* DROP TABLE IF EXISTS all_datatypes_2048; create table all_datatypes_2048 (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) stored as carbondata TBLPROPERTIES('table_blocksize'='2048','streaming'='true', 'sort_columns'='imei'); *+From Spark-shell ,execute :+* import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.Trigger.ProcessingTime val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv") df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start show segments for table all_datatypes_2048; *issue 1 :* *+when copy csv file in hdfs folder for 1st time after streaming started ,writestream fails with error:+* scala> df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start 21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is invalid. Using the default value "true" 21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured for key carbon.lock.type is invalid for current file system. Use the default value HDFSLOCK instead. 21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist 21/08/26 12:53:12 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist 21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.rdb.password.decode.enable does not exist 21/08/26 12:53:13 WARN HiveConf: HiveConf of name hive.metastore.db.ssl.enabled does not exist 21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8 scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration. at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454) 21/08/26 13:00:49 ERROR CarbonUtil: Error while closing stream:java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration. *issue 2 :* *+when copy csv file in hdfs folder for 2nd time after streaming started ,writestreaming fails with :+* 21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream segment: hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy17.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 21/08/26 13:01:36 ERROR Utils: Aborting task org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to APPEND_FILE /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) at org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy17.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) at org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) at org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) at org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) at org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) at org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > writestream fail when csv is copied to readstream hdfs path in Spark 2.4.5 > -------------------------------------------------------------------------- > > Key: CARBONDATA-4276 > URL: https://issues.apache.org/jira/browse/CARBONDATA-4276 > Project: CarbonData > Issue Type: Bug > Components: data-load > Affects Versions: 2.2.0 > Environment: Spark 2.4.5 > Reporter: PRIYESH RANJAN > Priority: Minor > > *With Carbon 2.2.0 Spark 2.4.5 cluster* > *steps :* > *+In hdfs execute following command :+* > cd /opt/HA/C10/install/hadoop/datanode/bin/ > ./hdfs dfs -rm -r /tmp/stream_test/checkpoint_all_data > ./hdfs dfs -mkdir -p > /tmp/stream_test/\{checkpoint_all_data,bad_records_all_data} > ./hdfs dfs -mkdir -p /Priyesh/streaming/csv/ > ./hdfs dfs -cp /chetan/100_olap_C20.csv /Priyesh/streaming/csv/ > ./hdfs dfs -cp /Priyesh/streaming/csv/100_olap_C20.csv > /Priyesh/streaming/csv/100_olap_C21.csv > > *+From Spark-beeline /Spark-sql /Spark-shell, execute :+* > DROP TABLE IF EXISTS all_datatypes_2048; > create table all_datatypes_2048 (imei string,deviceInformationId int,MAC > string,deviceColor string,device_backColor string,modelId string,marketName > string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series > string,productionDate timestamp,bomCode string,internalModels string, > deliveryTime string, channelsId string, channelsName string , deliveryAreaId > string, deliveryCountry string, deliveryProvince string, deliveryCity > string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, > ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, > ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet > string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion > string, Active_operaSysVersion string, Active_BacVerNumber string, > Active_BacFlashVer string, Active_webUIVersion string, > Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, > Active_operatorsVersion string, Active_phonePADPartitionedVersions string, > Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR > string, Latest_areaId string, Latest_country string, Latest_province string, > Latest_city string, Latest_district string, Latest_street string, > Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion > string, Latest_BacVerNumber string, Latest_BacFlashVer string, > Latest_webUIVersion string, Latest_webUITypeCarrVer string, > Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, > Latest_phonePADPartitionedVersions string, Latest_operatorId string, > gamePointDescription string,gamePointId double,contractNumber BigInt) stored > as carbondata TBLPROPERTIES('table_blocksize'='2048','streaming'='true', > 'sort_columns'='imei'); > > *+From Spark-shell ,execute :+* > import org.apache.spark.sql.streaming._ > import org.apache.spark.sql.streaming.Trigger.ProcessingTime > val df_j=spark.readStream.text("hdfs://hacluster/Priyesh/streaming/csv/*.csv") > df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", > > "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start > show segments for table all_datatypes_2048; > > *issue 1 :* > *+when copy csv file in hdfs folder for 1st time after streaming started > ,writestream fails with error:+* > scala> > df_j.writeStream.format("carbondata").option("dbName","ranjan").option("carbon.stream.parser","org.apache.carbondata.streaming.parser.CSVStreamParserImp").option("checkpointLocation", > > "hdfs://hacluster/tmp/stream_test/checkpoint_all_data").option("bad_records_action","hdfs://hacluster/tmp/stream_test/bad_records_all_data").option("tableName","all_datatypes_2048").trigger(ProcessingTime(6000)).option("carbon.streaming.auto.handoff.enabled","true").option("carbon.streaming.segment.max.size",102400).start > 21/08/26 12:53:11 WARN CarbonProperties: The enable mv value "null" is > invalid. Using the default value "true" > 21/08/26 12:53:11 WARN CarbonProperties: The value "LOCALLOCK" configured > for key carbon.lock.type is invalid for current file system. Use the default > value HDFSLOCK instead. > 21/08/26 12:53:12 WARN HiveConf: HiveConf of name > hive.metastore.rdb.password.decode.enable does not exist > 21/08/26 12:53:12 WARN HiveConf: HiveConf of name > hive.metastore.db.ssl.enabled does not exist > 21/08/26 12:53:13 WARN HiveConf: HiveConf of name > hive.metastore.rdb.password.decode.enable does not exist > 21/08/26 12:53:13 WARN HiveConf: HiveConf of name > hive.metastore.db.ssl.enabled does not exist > 21/08/26 12:53:14 WARN ObjectStore: Failed to get database global_temp, > returning NoSuchObjectException > res0: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@ad038f8 > scala> 21/08/26 13:00:49 WARN DFSClient: DataStreamer Exception > java.io.IOException: Failed to replace a bad datanode on the existing > pipeline due to no more good datanodes being available to try. (Nodes: > current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], > > DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], > > original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], > > DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). > The current failed datanode replacement policy is DEFAULT, and a client may > configure this via > 'dfs.client.block.write.replace-datanode-on-failure.policy' in its > configuration. > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:925) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:988) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1156) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454) > 21/08/26 13:00:49 ERROR CarbonUtil: Error while closing > stream:java.io.IOException: Failed to replace a bad datanode on the existing > pipeline due to no more good datanodes being available to try. (Nodes: > current=[DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK], > > DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK]], > > original=[DatanodeInfoWithStorage[7.187.185.59:50076,DS-443c5cbf-19e9-477e-afdb-92e8807730e3,DISK], > > DatanodeInfoWithStorage[7.187.185.158:50076,DS-67d21505-b7fd-4ccc-b7e5-7c83c042f3e3,DISK]]). > The current failed datanode replacement policy is DEFAULT, and a client may > configure this via > 'dfs.client.block.write.replace-datanode-on-failure.policy' in its > configuration. > *issue 2 :* > *+when copy csv file in hdfs folder for 2nd time after streaming started > ,writestreaming fails with :+* > > 21/08/26 13:01:36 ERROR StreamSegment: Failed to append batch data to stream > segment: > hdfs://hacluster/user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0 > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to APPEND_FILE > /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata > for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because > DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) > at > org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy17.append(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.append(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) > at > org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) > at > org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) > at > org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) > at > org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) > at > org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) > at > org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 21/08/26 13:01:36 ERROR Utils: Aborting task > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): > Failed to APPEND_FILE > /user/sparkhive/warehouse/ranjan.db/all_datatypes_2048/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-0.snappy.carbondata > for DFSClient_NONMAPREDUCE_260546362_1 on 7.187.185.158 because > DFSClient_NONMAPREDUCE_260546362_1 is already the current lease holder. > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2540) > at > org.apache.hadoop.hdfs.server.namenode.FSDirAppendOp.appendFile(FSDirAppendOp.java:124) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2624) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:805) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:487) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:872) > at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:818) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2678) > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy17.append(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy18.append(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) > at > org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) > at > org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) > at > org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.getDataOutputStreamUsingAppend(AbstractDFSCarbonFile.java:440) > at > org.apache.carbondata.core.datastore.impl.FileFactory.getDataOutputStreamUsingAppend(FileFactory.java:348) > at > org.apache.carbondata.streaming.CarbonStreamRecordWriter.initializeAtFirstRow(CarbonStreamRecordWriter.java:176) > at > org.apache.carbondata.streaming.CarbonStreamRecordWriter.write(CarbonStreamRecordWriter.java:210) > at > org.apache.carbondata.streaming.segment.StreamSegment.appendBatchData(StreamSegment.java:278) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply$mcV$sp(CarbonAppendableStreamSink.scala:349) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$writeDataFileTask$1.apply(CarbonAppendableStreamSink.scala:338) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$.writeDataFileTask(CarbonAppendableStreamSink.scala:351) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:271) > at > org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink$$anonfun$liftedTree1$1$1.apply(CarbonAppendableStreamSink.scala:270) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian Jira (v8.3.4#803005)