dev, chenliang6136 

hi
environment  spark.2.1.1 carbondata 1.1.1  hadoop 2.7.2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val cc = 
SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://ns1/user/e_carbon/public/carbon.store")
 


Repeated execution


---增量表排重
cc.sql(" cache table cache_prod_inst_add as select PROD_INST_ID,case when 
evttype='INSERT' then '70A' WHEN evttype='UPDATE' then '70A' ELSE 'DEL' END 
ODS_STATE,KAFKA_DATE,IND,GTID,PRODUCT_ID,ACC_PROD_INST_ID,ADDRESS_ID,OWNER_CUST_ID,PAYMENT_MODE_CD,PRODUCT_PASSWORD,IMPORTANT_LEVEL,AREA_CODE,ACC_NBR,EXCH_ID,COMMON_REGION_ID,REMARK,PAY_CYCLE,BEGIN_RENT_TIME,STOP_RENT_TIME,FINISH_TIME,STOP_STATUS,STATUS_CD,CREATE_DATE,STATUS_DATE,UPDATE_DATE,PROC_SERIAL,USE_CUST_ID,EXT_PROD_INST_ID,ADDRESS_DESC,AREA_ID,UPDATE_STAFF,CREATE_STAFF,REC_UPDATE_DATE,ACCOUNT,VERSION,COMMUNITY_ID,EXT_ACC_PROD_INST_ID,DISTRIBUTOR_ID,SHARDING_ID,REC_TIME
  from (select a.*, row_number() over(partition by a.prod_inst_id order by 
a.gtid desc ,a.ind desc) rownum from  e_carbon.prod_inst_ADD_H a ) b where 
b.rownum=1").show
-----在找出在1表已有的数据
cc.sql("cache table cache_prod_inst_exist as select a.* from 
e_carbon.prod_inst_c a,cache_prod_inst_add b where 
a.prod_inst_id=b.prod_inst_id").show;
-----版本比较找出需要更新的数据
cc.sql("cache table cache_prod_inst_U as select prod_inst_id, 
OWNER_CUST_ID,ACC_PROD_INST_ID,'0' DVERSION,GTID,IND,ODS_STATE, 'UMQ' 
SRC,date_format(current_timestamp(),'yyyyMMddhhmmss') 
ods_date,kafka_date,PRODUCT_ID,ADDRESS_ID,PAYMENT_MODE_CD,PRODUCT_PASSWORD,IMPORTANT_LEVEL,AREA_CODE,ACC_NBR,EXCH_ID,COMMON_REGION_ID,REMARK,PAY_CYCLE,BEGIN_RENT_TIME,STOP_RENT_TIME,FINISH_TIME,STOP_STATUS,STATUS_CD,CREATE_DATE,STATUS_DATE,UPDATE_DATE,PROC_SERIAL,USE_CUST_ID,EXT_PROD_INST_ID,ADDRESS_DESC,AREA_ID,UPDATE_STAFF,CREATE_STAFF,REC_UPDATE_DATE,ACCOUNT,VERSION,COMMUNITY_ID,EXT_ACC_PROD_INST_ID,DISTRIBUTOR_ID,SHARDING_ID
 from (select b.* from cache_prod_inst_exist a,cache_prod_inst_add b where 
a.prod_inst_id=b.prod_inst_id  and  b.gtid>a.gtid union all select b.* from  
cache_prod_inst_exist a,cache_prod_inst_add b where 
a.prod_inst_id=b.prod_inst_id and  a.gtid=b.gtid and b.ind>a.ind)").show;
--UPDATE数据
cc.sql("update e_carbon.prod_inst_c  A set 
(a.OWNER_CUST_ID,a.ACC_PROD_INST_ID,a.DVERSION,a.GTID,a.IND,a.ODS_STATE,A.SRC,a.ods_date,a.kafka_date,a.PRODUCT_ID,a.ADDRESS_ID,a.PAYMENT_MODE_CD,a.PRODUCT_PASSWORD,a.IMPORTANT_LEVEL,a.AREA_CODE,a.ACC_NBR,a.EXCH_ID,a.COMMON_REGION_ID,a.REMARK,a.PAY_CYCLE,a.BEGIN_RENT_TIME,a.STOP_RENT_TIME,a.FINISH_TIME,a.STOP_STATUS,a.STATUS_CD,a.CREATE_DATE,a.STATUS_DATE,a.UPDATE_DATE,a.PROC_SERIAL,a.USE_CUST_ID,a.EXT_PROD_INST_ID,a.ADDRESS_DESC,a.AREA_ID,a.UPDATE_STAFF,a.CREATE_STAFF,a.REC_UPDATE_DATE,a.ACCOUNT,a.VERSION,a.COMMUNITY_ID,a.EXT_ACC_PROD_INST_ID,a.DISTRIBUTOR_ID,a.SHARDING_ID)=(select
 
b.OWNER_CUST_ID,b.ACC_PROD_INST_ID,B.DVERSION,b.GTID,b.IND,B.ODS_STATE,B.SRC,b.ods_date,b.kafka_date,b.PRODUCT_ID,b.ADDRESS_ID,b.PAYMENT_MODE_CD,b.PRODUCT_PASSWORD,b.IMPORTANT_LEVEL,b.AREA_CODE,b.ACC_NBR,b.EXCH_ID,b.COMMON_REGION_ID,b.REMARK,b.PAY_CYCLE,b.BEGIN_RENT_TIME,b.STOP_RENT_TIME,b.FINISH_TIME,b.STOP_STATUS,b.STATUS_CD,b.CREATE_DATE,b.STATUS_DATE,b.UPDATE_DATE,b.PROC_SERIAL,b.USE_CUST_ID,b.EXT_PROD_INST_ID,b.ADDRESS_DESC,b.AREA_ID,b.UPDATE_STAFF,b.CREATE_STAFF,b.REC_UPDATE_DATE,b.ACCOUNT,b.VERSION,b.COMMUNITY_ID,b.EXT_ACC_PROD_INST_ID,b.DISTRIBUTOR_ID,b.SHARDING_ID
 from cache_prod_inst_u b where b.prod_inst_id=a.prod_inst_id)").show;
---insert新增的数据
cc.sql(" insert into e_carbon.prod_inst_c select  
PROD_INST_ID,OWNER_CUST_ID,ACC_PROD_INST_ID,'0' 
DVERSION,GTID,IND,ODS_STATE,'MQ' 
SRC,date_format(current_timestamp(),'yyyyMMddhhmmss') 
ods_date,kafka_date,PRODUCT_ID,ADDRESS_ID,PAYMENT_MODE_CD,PRODUCT_PASSWORD,IMPORTANT_LEVEL,AREA_CODE,ACC_NBR,EXCH_ID,COMMON_REGION_ID,REMARK,PAY_CYCLE,BEGIN_RENT_TIME,STOP_RENT_TIME,FINISH_TIME,STOP_STATUS,STATUS_CD,CREATE_DATE,STATUS_DATE,UPDATE_DATE,PROC_SERIAL,USE_CUST_ID,EXT_PROD_INST_ID,ADDRESS_DESC,AREA_ID,UPDATE_STAFF,CREATE_STAFF,REC_UPDATE_DATE,ACCOUNT,VERSION,COMMUNITY_ID,EXT_ACC_PROD_INST_ID,DISTRIBUTOR_ID,SHARDING_ID
 from (select a.*,b.prod_Inst_id bprod_inst_id  from cache_prod_inst_add  a 
left outer join cache_prod_inst_exist b on a.prod_inst_id=b.prod_inst_id) c 
where c.bprod_inst_id is null").show;


 cc.sql("select area_code,count(*) from e_carbon.prod_inst_c group by 
area_code").show;    
“Lost task 16.1 in stage 0.0 (TID 51, HDD014, executor 4): 
org.apache.carbondata.core.datastore.exception.IndexBuilderException: 
at 
org.apache.carbondata.core.datastore.BlockIndexStore.getAll(BlockIndexStore.java:186)

table  unavailable   How to recover table ?



yixu2001
 
From: Liang Chen
Date: 2017-09-13 21:43
To: dev
Subject: Re: Block B-tree loading failed
Hi 
 
Looks that the path is invalid, can you provide full script: how you created
carbonsession?
-----------------------------
Caused by:
org.apache.carbondata.core.datastore.exception.IndexBuilderException:
Invalid carbon data file:
hdfs://ns1/user/e_carbon/public/carbon.store/e_carbon/prod_inst_cold/Fact/Part0/Segment_0/part-0-30_batchno0-0-1505272524271.carbondata
  
 
 
 
--
Sent from: 
http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/

Reply via email to